精品国产人成在线_亚洲高清无码在线观看_国产在线视频国产永久2021_国产AV综合第一页一个的一区免费影院黑人_最近中文字幕MV高清在线视频

0
  • 聊天消息
  • 系統消息
  • 評論與回復
登錄后你可以
  • 下載海量資料
  • 學習在線課程
  • 觀看技術視頻
  • 寫文章/發帖/加入社區
會員中心
創作中心

完善資料讓更多小伙伴認識你,還能領取20積分哦,立即完善>

3天內不再提示

線程池技術簡介與Apollo線程池類源代碼分析

YB7m_Apollo_Dev ? 來源:未知 ? 作者:李倩 ? 2018-06-05 14:30 ? 次閱讀

* 本文修改后的代碼已上傳到GitHub網站Apollo項目中。

{ 1 }

線程池技術簡介

1線程池的定義

線程池是一種多線程形式,首先開啟指定數量的后臺工作線程,并將多個待執行任務添加到任務隊列,然后將隊列中的任務逐個交給空閑的工作線程執行(如下圖所示)。

2使用線程池的原因

創建/銷毀線程伴隨著操作系統的資源開銷,過于頻繁的創建/銷毀線程,會很大程度上影響處理效率。若創建線程消耗時間T1,執行任務消耗時間T2,銷毀線程消耗時間T3,如果T1+T3>T2,則開啟一個線程來執行一個任務就很不劃算,而使用線程池緩存線程,就可利用已有的閑置線程來執行新任務,有效避免T1+T3帶來的系統開銷。

線程并發數量過多,搶占系統資源從而導致阻塞。我們知道線程會共享系統資源,如果同時執行的線程數量過多,可能會導致系統資源不足而產生操作卡頓甚至出現假死現象,運用線程池能有效地控制線程最大并發數,有效避免上述問題。

對線程進行一些簡單的管理。比如:延時執行、定時循環執行等策略,運用線程池就較容易實現。

3C++中如何使用線程池

C++標準庫不提供線程池,如需使用需自行撰寫線程池類。GitHub中有多個線程池類的實現,Apollo項目也參考了其中的一個實現【https://github.com/vit-vit/CTPL】。

{ 2 }

Apollo線程池類源代碼分析

Apollo線程池文件位于[your_apollo_root_dir]/modules/common/util/ctpl_stl.h,包含任務隊列類Queue和線程池類ThreadPool,其中Queue位于命名空間apollo::common::util::detail內,ThreadPool位于命名空間apollo::common::util內。

1任務隊列類Queue

任務隊列類Queue基于C++標準庫的隊列類std::queue實現,只是對push、pop和empty三個函數進行了加鎖操作。

template class Queue { public: bool push(T const &value) { // 使用std::lock_guard效率更高 std::unique_lock lock(mutex_); q_.push(value); return true; } // deletes the retrieved element, do not use for non integral types bool pop(T &v) { // NOLINT // 使用std::lock_guard效率更高 std::unique_lock lock(mutex_); if (q_.empty()) { return false; } v = q_.front(); q_.pop(); return true; } bool empty() { // 使用std::lock_guard效率更高 std::unique_lock lock(mutex_); return q_.empty(); } private: std::queue q_; std::mutex mutex_;};

根據這篇博客【https://blog.csdn.net/tgxallen/article/details/73522233】的介紹,可使用std::lock_guard和std::unique_lock提供RAII(資源獲取即初始化,Resource Acquisition Is Initialization,參見該網頁【https://blog.csdn.net/doc_sgl/article/details/43028009】)風格的加鎖操作,其中std::lock_guard的系統開銷更小,std::unique_lock更為靈活(可適時解鎖)。就我們的任務隊列類Queue而言,不需要std::unique_lock提供的靈活性,因此使用std::lock_guard更為合適。另外,我還增加一個接受右值引用的push函數,以方便下文中的ThreadPool使用,修改后的類如下:

class Queue { public: bool push(const T &value) { std::lock_guard lock(mutex_); q_.push(value); return true; } // 增加一個接受右值引用的push函數 bool push(T &&value) { std::lock_guard lock(mutex_); q_.emplace(std::move(value)); return true; } // deletes the retrieved element, do not use for non integral types bool pop(T &v) { // NOLINT std::lock_guard lock(mutex_); if (q_.empty()) { return false; } v = q_.front(); q_.pop(); return true; } bool empty() { std::lock_guard lock(mutex_); return q_.empty(); } private: std::queue q_; std::mutex mutex_;};

2線程池類ThreadPool

線程池類ThreadPool的主要功能是創建n_threads個后臺工作線程,將任務函數f包裝成std::function的形式存入任務隊列q_,根據當前工作線程空閑情況,適時從任務隊列q_中提取一個任務函數并執行之。注意復制構造函數ThreadPool(const ThreadPool &)、移動構造函數ThreadPool(ThreadPool &&)、復制運算符ThreadPool &operator=(const ThreadPool &)、移動運算符ThreadPool &operator=(ThreadPool &&)全部設置為private,表明禁止使用這些函數。其實C++11標準完成可以通過在函數聲明后加上= delete;的方式來禁用,源代碼以注釋的方式給出了這種實現方式。

下面分析該類中幾個比較重要的成員函數。

2.2.1 Push函數

Push函數的作用是將任務函數f包裝成std::function的形式存入任務隊列q_。Push函數有兩個版本,一個允許任務函數f帶可變參數Rest &&... rest,一個不允許任務函數f帶額外參數,函數體內部代碼大同小異,下面以帶可變參數的版本進行分析,代碼如下:

template auto Push(F &&f, Rest &&... rest) -> std::future { // std::placeholders::_1表示通過std::bind函數綁定后得到的異步任務對象接受的第一個參數是自由參數 auto pck = std::make_shared>( std::bind(std::forward(f), std::placeholders::_1, std::forward(rest)...)); // 最好使用std::make_shared創建智能指針對象,后面不用操心指針內存的釋放 // auto _f = std::make_shared>([pck](int id) { (*pck)(id); }); auto _f = new std::function([pck](int id) { (*pck)(id); }); q_.push(_f); // 這里不要加鎖,否則易引起死鎖 std::unique_lock lock(mutex_); cv_.notify_one(); return pck->get_future();}

Push函數的返回值為一個std::future對象,std::future對象內存儲的數據類型由(f(0, rest...)函數的返回值類型確定,decltype(f(0, rest...))的作用就是獲取(f(0, rest...)函數的返回值類型。std::future提供一種異步操作結果的訪問機制,從字面意思來理解,它表示未來,我覺得這個名字非常貼切,因為一個異步操作的結果不可能馬上獲取,只能在未來某個時候得到。關于std::future,這篇博客【https://blog.csdn.net/yockie/article/details/50595958】講得挺不錯,大家可以借鑒。

因為任務函數f的聲明各式各樣,有的不帶參數,有的接受一個參數,有的接受兩個參數……因此不能將其直接存儲到任務隊列q_,于是先利用std::bind函數將其包裝為一個異步操作任務std::packaged_task對象pck(接受一個整型參數,返回值類型為(f(0, rest...)函數的返回值類型),再利用Lambda表達式將pck包裝為一個std::function對象,這樣就可以存儲到任務隊列q_中了。這里原作者直接使用new運算符創建裸指針_f,后面還需想辦法釋放指針內存,我認為不是很合適,使用std::make_shared創建智能指針可以自動管理內存,更加省事,但使用std::shared_ptr>智能指針就不能使用Queue::push(const T &value)版本將其存儲到任務隊列,為此我在Queue類中添加了一個接受右值引用參數的版本Queue::push(T &&value),使用該版本就可以順利將智能指針存儲進去了。

接下來,使用條件變量std::condition_variable對象cv_.notify_one()函數通知各個線程任務隊列已經發生了改變,讓空閑線程趕緊從任務隊列中拉取新任務執行;最后通過pck->get_future()返回一個std::future對象,以便調用者能從中取出函數執行完畢后的返回值。

我看過很多C++多線程方面的書籍(”C++ Concurrency in Action”比較經典),一般不對cv_.notify_one();進行加鎖操作,因為這樣做除了降低效率外,還很容易引起死鎖,故需去除加鎖操作,具體原因參見該網頁【https://stackoverflow.com/questions/17101922/do-i-have-to-acquire-lock-before-calling-condition-variable-notify-one】以及另一個網頁【http://en.cppreference.com/w/cpp/thread/condition_variable/notify_one】。

以下是修改后的版本:

template auto Push(F &&f, Rest &&... rest) -> std::future { auto pck = std::make_shared>( std::bind(std::forward(f), std::placeholders::_1, std::forward(rest)...)); auto _f = std::make_shared>( [pck](int id) { (*pck)(id); }); // It is not necessary to lock q_ because it is locked in the Queue class. q_.push(std::move(_f)); cv_.notify_one(); return pck->get_future();}

2.2.2Pop函數

Pop函數的作用是從任務隊列q_中取出并返回一個任務,代碼如下:

std::function Pop() { std::function *_f = nullptr; q_.pop(_f); // 如果任務隊列q_中存儲的是智能指針,就不必使用這種小花招來釋放內存了。 std::unique_ptr> func( _f); // at return, delete the function even if an exception occurred std::function f; if (_f) f = *_f; return f;}

首先,從從任務隊列q_中取出一個任務函數對象的裸指針_f,若非空,則將其賦值給std::function f并返回。該函數里使用一個小花招,即創建一個智能指針std::unique_ptr> func(_f),當超出該對象的作用域時,就會在其析構函數中調用delete運算符釋放內存。如果任務隊列q_中存儲的是智能指針,就不必使用這種小花招來釋放內存了。

以下是修改后的版本:

std::shared_ptr> Pop() { std::shared_ptr> f; q_.pop(f); return f;}

2.2.3Stop函數

Stop函數的作用停止線程池工作,若不允許等待,則直接停止當前正在執行的工作線程,同時清空任務隊列;若允許等待,則等待當前正在執行的工作線程完成,代碼如下:

void Stop(bool is_wait = false) { if (!is_wait) { if (is_stop_) { return; } is_stop_ = true; for (int i = 0, n = size(); i < n; ++i) { ? ? ? ?*(flags_[i]) = true; ?// command the threads to stop ? ? ?} ? ? ?ClearQueue(); ?// empty the queue ? ?} else { ? ? ?if (is_done_ || is_stop_) return; ? ? ?is_done_ = true; ?// give the waiting threads a command to finish ? ?} ? ?{ ? ? ?// 這里不要加鎖,否則易引起死鎖 ? ? ? ?std::unique_lock lock(mutex_); cv_.notify_all(); // stop all waiting threads } for (int i = 0; i < static_cast(threads_.size()); ++i) { // wait for the computing threads to finish if (threads_[i]->joinable()) { threads_[i]->join(); } } // if there were no threads in the pool but some functions in the queue, the // functions are not deleted by the threads // therefore delete them here ClearQueue(); threads_.clear(); flags_.clear();}

函數中的布爾變量is_stop_、is_done_、flags_[i]為什么都不用加鎖呢?這是因為它們都是原子類型std::atomic,所謂原子類型就是一條CPU指令就能完成取值或寫值操作的變量類型。C++標準可保證std::atomic類型變量在任何架構操作系統中均只使用一條CPU指令就可完成取值或寫值操作,其他形如std::atomic的類型,雖然將其聲明為原子類型,但在某些架構操作系統中,并不能只使用一條CPU指令完成取值或寫值操作。綜上所述,std::atomic類型的變量可以在多線程中不加鎖操作。

根據2.2.1節的分析,cv_.notify_all();的加鎖操作應去除。

修改后的代碼如下:

void Stop(bool is_wait = false) { if (!is_wait) { if (is_stop_) { return; } is_stop_ = true; for (int i = 0, n = size(); i < n; ++i) { ? ? ? ?*(flags_[i]) = true; ?// command the threads to stop ? ? ?} ? ? ?ClearQueue(); ?// empty the queue ? ?} else { ? ? ?if (is_done_ || is_stop_) return; ? ? ?is_done_ = true; ?// give the waiting threads a command to finish ? ?} ? ?cv_.notify_all(); ?// stop all waiting threads ? ?for (int i = 0; i < static_cast(threads_.size()); ++i) { // wait for the computing threads to finish if (threads_[i]->joinable()) { threads_[i]->join(); } } // if there were no threads in the pool but some functions in the queue, the // functions are not deleted by the threads // therefore delete them here ClearQueue(); threads_.clear(); flags_.clear();}

2.2.4ClearQueue函數

ClearQueue函數的作用是清空任務隊列q_,代碼如下:

void ClearQueue() { std::function *_f; // empty the queue while (q_.pop(_f)) { delete _f; }}

使用while循環從任務隊列q_中逐個彈出任務函數指針_f,因為_f使用new運算符創建,故需使用delete運算符刪除以釋放內存。如果任務隊列q_中存儲的是智能指針,就不必手工刪除對象來釋放內存了。

以下是使用智能指針的版本:

void ClearQueue() { std::shared_ptr> f; // empty the queue while (q_.pop(f)) { // do nothing } }

2.2.5Resize函數

Resize函數的作用是更改線程池內工作線程的數量,代碼如下:

void Resize(const int n_threads) { if (!is_stop_ && !is_done_) { int old_n_threads = static_cast(threads_.size()); if (old_n_threads <= ? ? ? ? ?n_threads) { ?// if the number of threads is increased ? ? ? ?threads_.resize(n_threads); ? ? ? ?flags_.resize(n_threads); ? ? ? ?for (int i = old_n_threads; i < n_threads; ++i) { ? ? ? ? ?flags_[i] = std::make_shared>(false); SetThread(i); } } else { // the number of threads is decreased for (int i = old_n_threads - 1; i >= n_threads; --i) { *(flags_[i]) = true; // this thread will finish threads_[i]->detach(); } { // stop the detached threads that were waiting // 這里不要加鎖,否則易引起死鎖 std::unique_lock lock(mutex_); cv_.notify_all(); } // safe to delete because the threads are detached threads_.resize(n_threads); // safe to delete because the threads // have copies of shared_ptr of the // flags, not originals flags_.resize(n_threads); } }}

如果兩個變量is_stop_、is_done_都不為真,表明線程池仍在使用,可以更改線程池內工作線程的數量,否則沒必要對一個停用的線程池更改工作線程的數量。若新線程數n_threads大于當前的工作線程數old_n_threads,則將工作線程數組threads_和線程標志數組flags_的尺寸修改為新數目,同時使用for循環調用SetThread(i)函數逐個重新創建工作線程;若新線程數n_threads小于當前的工作線程數old_n_threads,則將先完成old_n_threads - n_threads個線程正在執行的任務,之后將工作線程數組threads_和線程標志數組flags_的尺寸修改為新數目。

根據2.2.1節的分析,cv_.notify_all();的加鎖操作應去除,具體原因參見該網頁

注意:Resize函數很危險,應盡量少調用,若必須調用,則應當在創建線程池的那個線程內調用,而不要在其他線程中調用。

修改的代碼如下:

void Resize(const int n_threads) { if (!is_stop_ && !is_done_) { int old_n_threads = static_cast(threads_.size()); if (old_n_threads <= ? ? ? ? ?n_threads) { ?// if the number of threads is increased ? ? ? ?threads_.resize(n_threads); ? ? ? ?flags_.resize(n_threads); ? ? ? ?for (int i = old_n_threads; i < n_threads; ++i) { ? ? ? ? ?flags_[i] = std::make_shared>(false); SetThread(i); } } else { // the number of threads is decreased for (int i = old_n_threads - 1; i >= n_threads; --i) { *(flags_[i]) = true; // this thread will finish threads_[i]->detach(); } // stop the detached threads that were waiting cv_.notify_all(); // safe to delete because the threads are detached threads_.resize(n_threads); // safe to delete because the threads // have copies of shared_ptr of the // flags, not originals flags_.resize(n_threads); } }}

2.2.6SetThread函數

SetThread函數的作用重新創建指定序號i的工作線程,代碼如下:

void SetThread(int i) { std::shared_ptr> flag( flags_[i]); // a copy of the shared ptr to the flag auto f = [this, i, flag /* a copy of the shared ptr to the flag */]() { std::atomic &_flag = *flag; std::function *_f; bool is_pop_ = q_.pop(_f); while (true) { while (is_pop_) { // if there is anything in the queue // 如果任務隊列q_中存儲的是智能指針,就不必使用這種小花招來釋放內存了。 std::unique_ptr> func( _f); // at return, delete the function even if an exception // occurred // 執行任務函數 (*_f)(i); if (_flag) { // the thread is wanted to stop, return even if the queue is not // empty yet return; } else { is_pop_ = q_.pop(_f); } } // the queue is empty here, wait for the next command // 這里必須使用std::unique_lock,因為后面條件變量cv_等待期間,需要解鎖。 std::unique_lock lock(mutex_); ++n_waiting_; // 等待任務隊列傳來的新任務 cv_.wait(lock, [this, &_f, &is_pop_, &_flag]() { is_pop_ = q_.pop(_f); return is_pop_ || is_done_ || _flag; }); --n_waiting_; if (!is_pop_) { // if the queue is empty and is_done_ == true or *flag // then return return; } } }; threads_[i].reset( new std::thread(f)); // compiler may not support std::make_unique() }

上述代碼看起來比較復雜,實際上只有三條語句,第一條是std::shared_ptr> flag(flags_[i]);,即使用flags_[i]來初始化標志變量flag;第二條看起來很長,實際上就是創建一個Lambda表達式變量f;第三條是threads_[i].reset(new std::thread(f));,使用Lambda表達式變量f作為工作線程的任務函數,創建序號為i的工作線程。

那么Lambda表達式變量f何時啟動呢?當任務隊列q_.pop(_f)的返回值為true時,表明從任務隊列q_中取到了一個新任務,于是調用(*_f)(i);執行之,如果當前任務隊列沒有任務,則使用:

cv_.wait(lock, [this, &_f, &is_pop_, &_flag]() { is_pop_ = q_.pop(_f); return is_pop_ || is_done_ || _flag;});

等待新任務的到來,在新任務到來之前,當前工作線程處于休眠狀態。

該函數同樣使用一個小花招,即創建一個智能指針std::unique_ptr> func(_f),當超出該對象的作用域時,就會在其析構函數中調用delete運算符釋放內存。如果任務隊列q_中存儲的是智能指針,就不必使用這種小花招來釋放內存了。

2.2.7修改后的ThreadPool類代碼

為完整起見,這里給出修改后的ThreadPool類代碼。

class ThreadPool { public: ThreadPool() { Init(); } explicit ThreadPool(int n_threads) { Init(); Resize(n_threads); } // the destructor waits for all the functions in the queue to be finished ~ThreadPool() { Stop(true); } // get the number of running threads in the pool int size() { return static_cast(threads_.size()); } // number of idle threads int NumIdle() { return n_waiting_; } std::thread &GetThread(const int i) { return *(threads_[i]); } // change the number of threads in the pool // should be called from one thread, otherwise be careful to not interleave, // also with stop() // n_threads must be >= 0 void Resize(const int n_threads) { if (!is_stop_ && !is_done_) { int old_n_threads = static_cast(threads_.size()); if (old_n_threads <= ? ? ? ? ?n_threads) { ?// if the number of threads is increased ? ? ? ?threads_.resize(n_threads); ? ? ? ?flags_.resize(n_threads); ? ? ? ?for (int i = old_n_threads; i < n_threads; ++i) { ? ? ? ? ?flags_[i] = std::make_shared>(false); SetThread(i); } } else { // the number of threads is decreased for (int i = old_n_threads - 1; i >= n_threads; --i) { *(flags_[i]) = true; // this thread will finish threads_[i]->detach(); } // stop the detached threads that were waiting cv_.notify_all(); // safe to delete because the threads are detached threads_.resize(n_threads); // safe to delete because the threads // have copies of shared_ptr of the // flags, not originals flags_.resize(n_threads); } } } // empty the queue void ClearQueue() { std::shared_ptr> f; // empty the queue while (q_.pop(f)) { // do nothing } } // pops a functional wrapper to the original function std::shared_ptr> Pop() { std::shared_ptr> f; q_.pop(f); return f; } // wait for all computing threads to finish and stop all threads // may be called asynchronously to not pause the calling thread while waiting // if is_wait == true, all the functions in the queue are run, otherwise the // queue is cleared without running the functions void Stop(bool is_wait = false) { if (!is_wait) { if (is_stop_) { return; } is_stop_ = true; for (int i = 0, n = size(); i < n; ++i) { ? ? ? ?*(flags_[i]) = true; ?// command the threads to stop ? ? ?} ? ? ?ClearQueue(); ?// empty the queue ? ?} else { ? ? ?if (is_done_ || is_stop_) return; ? ? ?is_done_ = true; ?// give the waiting threads a command to finish ? ?} ? ?cv_.notify_all(); ?// stop all waiting threads ? ?for (int i = 0; i < static_cast(threads_.size()); ++i) { // wait for the computing threads to finish if (threads_[i]->joinable()) { threads_[i]->join(); } } // if there were no threads in the pool but some functions in the queue, the // functions are not deleted by the threads // therefore delete them here ClearQueue(); threads_.clear(); flags_.clear(); } template auto Push(F &&f, Rest &&... rest) -> std::future { auto pck = std::make_shared>( std::bind(std::forward(f), std::placeholders::_1, std::forward(rest)...)); auto _f = std::make_shared>( [pck](int id) { (*pck)(id); }); // It is not necessary to lock q_ because it is locked in the Queue class. q_.push(std::move(_f)); cv_.notify_one(); return pck->get_future(); } // run the user's function that excepts argument int - id of the running // thread. returned value is templatized // operator returns std::future, where the user can get the result and rethrow // the catched exceptins template auto Push(F &&f) -> std::future { auto pck = std::make_shared>( std::forward(f)); auto _f = std::make_shared>( [pck](int id) { (*pck)(id); }); // It is not necessary to lock q_ because it is locked in the Queue class. q_.push(std::move(_f)); cv_.notify_one(); return pck->get_future(); } private: // deleted ThreadPool(const ThreadPool &); // = delete; ThreadPool(ThreadPool &&); // = delete; ThreadPool &operator=(const ThreadPool &); // = delete; ThreadPool &operator=(ThreadPool &&); // = delete; void SetThread(int i) { std::shared_ptr> flag( flags_[i]); // a copy of the shared ptr to the flag auto f = [this, i, flag /* a copy of the shared ptr to the flag */]() { std::atomic &_flag = *flag; std::shared_ptr> _f; bool is_pop_ = q_.pop(_f); while (true) { while (is_pop_) { // if there is anything in the queue (*_f)(i); if (_flag) { // the thread is wanted to stop, return even if the queue is not // empty yet return; } else { is_pop_ = q_.pop(_f); } } // the queue is empty here, wait for the next command { std::unique_lock lock(mutex_); ++n_waiting_; cv_.wait(lock, [this, &_f, &is_pop_, &_flag]() { is_pop_ = q_.pop(_f); return is_pop_ || is_done_ || _flag; }); --n_waiting_; if (!is_pop_) { // if the queue is empty and is_done_ == true or *flag // then return return; } } } }; threads_[i].reset( new std::thread(f)); // compiler may not support std::make_unique() } void Init() { is_stop_ = false; is_done_ = false; n_waiting_ = 0; } std::vector> threads_; std::vector>> flags_; detail::Queue>> q_; std::atomic is_done_; std::atomic is_stop_; std::atomic n_waiting_; // how many threads are waiting std::mutex mutex_; std::condition_variable cv_;};

2.2.8增加的單元測試代碼

為檢驗修改后代碼的正確性,增添如下單元測試代碼。第一個待測試函數filter_duplicates_str接受的第一個參數為一個整型ID值,我在測試代碼中只是將其作為一個占位符,實際并未使用,后面接受四個C風格字符串,該函數的任務是去除四個字符串中的重復詞并把去重后的結果按字母升序排列,結果以std::string的形式返回;第二個待測試函數filter_duplicates只接受的一個整型ID值參數,我在測試代碼中只是將其作為一個占位符,實際并未使用,該函數的任務是去除一串固定字符串中的重復詞并把去重后的結果按字母升序排列,結果以std::string的形式返回。因為C++編譯器不能推導出重載函數的正確版本,因此第二個待測函數并未使用重載函數形式。兩個待測函數均使用線程池執行1000次,最后檢查返回結果與預期結果的一致性。

#include "modules/common/util/ctpl_stl.h"#include #include #include #include #include #include #include "gtest/gtest.h"namespace apollo {namespace common {namespace util {namespace {// ...// Attention: don't use overloaded functions, otherwise the compiler can't// deduce the correct edition.std::string filter_duplicates_str(int id, const char* str1, const char* str2, const char* str3, const char* str4) { // id is unused. std::stringstream ss_in; ss_in << str1 << " " << str2 << " " << str3 << " " << str4; ?std::set string_set; std::istream_iterator beg(ss_in); std::istream_iterator end; std::copy(beg, end, std::inserter(string_set, string_set.end())); std::stringstream ss_out; std::copy(std::begin(string_set), std::end(string_set), std::ostream_iterator(ss_out, " ")); return ss_out.str();}std::string filter_duplicates(int id) { // id is unused. std::stringstream ss_in; ss_in << "a a b b b c foo foo bar foobar foobar hello world hello hello world"; ?std::set string_set; std::istream_iterator beg(ss_in); std::istream_iterator end; std::copy(beg, end, std::inserter(string_set, string_set.end())); std::stringstream ss_out; std::copy(std::begin(string_set), std::end(string_set), std::ostream_iterator(ss_out, " ")); return ss_out.str();}} // namespaceTEST(ThreadPool, filter_duplicates) { const unsigned int hardware_threads = std::thread::hardware_concurrency(); const unsigned int threads = std::min(hardware_threads != 0 ? hardware_threads : 2, 50U); ThreadPool p(threads); std::vector> futures1, futures2; for (int i = 0; i < 1000; ++i) { ? ?futures1.push_back(std::move(p.Push( ? ? ? ?filter_duplicates_str, "thread pthread", "pthread thread good news", ? ? ? ?"today is a good day", "she is a six years old girl"))); ? ?futures2.push_back(std::move(p.Push(filter_duplicates))); ?} ?for (int i = 0; i < 1000; ++i) { ? ?std::string result1 = futures1[i].get(); ? ?std::string result2 = futures2[i].get(); ? ?EXPECT_STREQ( ? ? ? ?result1.c_str(), ? ? ? ?"a day girl good is news old pthread she six thread today years "); ? ?EXPECT_STREQ(result2.c_str(), "a b bar c foo foobar hello world "); ?}}} ?// namespace util} ?// namespace common} ?// namespace apollo

{ 3 }

Apollo Planning模塊對于線程池的使用分析

Apollo Planning模塊通過PlanningThreadPool類來完成對線程池ThreadPool的包裝調用。PlanningThreadPool類位于頭文件[your_apollo_root_dir]/modules/planning/common/planning_thread_pool.h及對應的實現文件[your_apollo_root_dir]/modules/planning/common/planning_thread_pool.cc中,位于命名空間apollo::planning內。

1PlanningThreadPool類

PlanningThreadPool類的聲明如下:

class PlanningThreadPool { public: void Init(); void Stop() { if (thread_pool_) { thread_pool_->Stop(true); } } template void Push(F &&f, Rest &&... rest) { func_.push_back(std::move(thread_pool_->Push(f, rest...))); } template void Push(F &&f) { func_.push_back(std::move(thread_pool_->Push(f))); } void Synchronize(); private: std::unique_ptr thread_pool_; bool is_initialized = false; // 這里的func_用得非常不恰當,因為這里保存的是std::future對象, // 而非std::function對象,將其修改為futures_很有必要。 std::vector> func_; DECLARE_SINGLETON(PlanningThreadPool);};

PlanningThreadPool通過宏DECLARE_SINGLETON定義一個單實例類,因此不能直接在棧(stack)和堆(heap)上創建該類對象,而只能通過PlanningThreadPool::instance()獲取該類的唯一實例。該類中的成員變量func_非常具有誤導性,實際上它是一個保存著多個std::future對象的動態數組,而不是保存std::function對象,也就是說它保存的是函數的異步返回值對象,而非異步函數對象本身,因此這里將其修改為futures_很有必要。

2PlanningThreadPool類的使用

在Planning模塊使用PlanningThreadPool類的步驟如下:

3.2.1初始化線程池

在Planning::Init()函數(位于[your_apollo_root_dir]/modules/planning/planning.cc)中添加如下語句完成PlanningThreadPool類對象的初始化:

// initialize planning thread pool PlanningThreadPool::instance()->Init();

3.2.2 利用線程池完成并發處理

在合適的位置調用線程池完成某個功能的并發處理,一般而言是在某個循環體內。注意:需進行并發處理的任務,相互之間不能有先后依賴關系,因為使用線程池執行并發任務時根本不知道哪個任務會先執行,哪個任務會后執行。

Planning模塊目前在以下幾處使用了線程池:

ReferenceLineInfo::AddObstacles函數ReferenceLineInfo::AddObstacles函數(位于[your_apollo_root_dir]/modules/planning/common/reference_line_info.cc中)在for循環內使用PlanningThreadPool::instance()->Push添加線程池任務,用于增加當前的障礙物信息,使用PlanningThreadPool::instance()->Synchronize()等待線程池任務全部完成。

bool ReferenceLineInfo::AddObstacles(const std::vector& obstacles) {if (FLAGS_use_multi_thread_to_add_obstacles) {std::vector ret(obstacles.size(), 0);for (size_t i = 0; i < obstacles.size(); ++i) { ?const auto* obstacle = obstacles.at(i); ?PlanningThreadPool::instance()->Push(std::bind( &ReferenceLineInfo::AddObstacleHelper, this, obstacle, &(ret[i])));}PlanningThreadPool::instance()->Synchronize();if (std::find(ret.begin(), ret.end(), 0) != ret.end()) { return false;}} else {// ...}return true;}

DPRoadGraph::GenerateMinCostPath函數DPRoadGraph::GenerateMinCostPath函數(位于[your_apollo_root_dir]/modules/planning/tasks/dp_poly_path/dp_road_graph.cc中)在每級航點(way point)上多個橫向采樣點的for循環內使用PlanningThreadPool::instance()->Push添加線程池任務,用于計算本級航點的最小代價,使用PlanningThreadPool::instance()->Synchronize()等待線程池任務全部完成。

bool DPRoadGraph::GenerateMinCostPath( const std::vector &obstacles, std::vector *min_cost_path) { // ... for (std::size_t level = 1; level < path_waypoints.size(); ++level) { ? ?const auto &prev_dp_nodes = graph_nodes.back(); ? ?const auto &level_points = path_waypoints[level]; ? ?graph_nodes.emplace_back(); ? ?for (size_t i = 0; i < level_points.size(); ++i) { ? ? ?const auto &cur_point = level_points[i]; ? ? ?graph_nodes.back().emplace_back(cur_point, nullptr); ? ? ?auto &cur_node = graph_nodes.back().back(); ? ? ?if (FLAGS_enable_multi_thread_in_dp_poly_path) { ? ? ? ?PlanningThreadPool::instance()->Push(std::bind( &DPRoadGraph::UpdateNode, this, std::ref(prev_dp_nodes), level, total_level, &trajectory_cost, &(front), &(cur_node))); } else { UpdateNode(prev_dp_nodes, level, total_level, &trajectory_cost, &front, &cur_node); } } if (FLAGS_enable_multi_thread_in_dp_poly_path) { PlanningThreadPool::instance()->Synchronize(); } } // ...}

DpStGraph::CalculateTotalCost函數DpStGraph::CalculateTotalCost函數(位于[your_apollo_root_dir]/modules/planning/tasks/dp_st_speed/dp_st_graph.cc中)在for循環內使用PlanningThreadPool::instance()->Push添加線程池任務,對于時間采樣值c上的不同距離采樣值r: next_lowest_row<=r<=next_highest_row計算抵達節點(c, r)的最小總代價,使用PlanningThreadPool::instance()->Synchronize()等待線程池任務全部完成。

Status DpStGraph::CalculateTotalCost() { // col and row are for STGraph // t corresponding to col // s corresponding to row uint32_t next_highest_row = 0; uint32_t next_lowest_row = 0; for (size_t c = 0; c < cost_table_.size(); ++c) { ? ?int highest_row = 0; ? ?int lowest_row = cost_table_.back().size() - 1; ? ?for (uint32_t r = next_lowest_row; r <= next_highest_row; ++r) { ? ? ?if (FLAGS_enable_multi_thread_in_dp_st_graph) { ? ? ? ?PlanningThreadPool::instance()->Push( std::bind(&DpStGraph::CalculateCostAt, this, c, r)); } else { CalculateCostAt(c, r); } } if (FLAGS_enable_multi_thread_in_dp_st_graph) { PlanningThreadPool::instance()->Synchronize(); } for (uint32_t r = next_lowest_row; r <= next_highest_row; ++r) { ? ? ?const auto& cost_cr = cost_table_[c][r]; ? ? ?if (cost_cr.total_cost() < std::numeric_limits::infinity()) { int h_r = 0; int l_r = 0; GetRowRange(cost_cr, &h_r, &l_r); highest_row = std::max(highest_row, h_r); lowest_row = std::min(lowest_row, l_r); } } next_highest_row = highest_row; next_lowest_row = lowest_row; } return Status::OK();}

3.2.3 銷毀線程池

在Planning::Stop()函數(位于[your_apollo_root_dir]/modules/planning/planning.cc)中添加如下語句以便 銷毀線程池:

1 PlanningThreadPool::instance()->Stop();

自Apollo平臺開放已來,我們收到了大量開發者的咨詢和反饋,越來越多開發者基于Apollo擦出了更多的火花,并愿意將自己的成果貢獻出來,這充分體現了Apollo『貢獻越多,獲得越多』的開源精神。為此我們開設了『開發者說』板塊,希望開發者們能夠踴躍投稿,更好地為廣大自動駕駛開發者營造一個共享交流的平臺!

聲明:本文內容及配圖由入駐作者撰寫或者入駐合作網站授權轉載。文章觀點僅代表作者本人,不代表電子發燒友網立場。文章及其配圖僅供工程師學習之用,如有內容侵權或者其他違規問題,請聯系本站處理。 舉報投訴
  • 源代碼
    +關注

    關注

    96

    文章

    2944

    瀏覽量

    66671
  • 線程池
    +關注

    關注

    0

    文章

    57

    瀏覽量

    6836
  • Apollo
    +關注

    關注

    5

    文章

    340

    瀏覽量

    18407

原文標題:開發者說 | Apollo項目線程池技術淺析

文章出處:【微信號:Apollo_Developers,微信公眾號:Apollo開發者社區】歡迎添加關注!文章轉載請注明出處。

收藏 人收藏

    評論

    相關推薦

    Java中的線程包括哪些

    java.util.concurrent 包來實現的,最主要的就是 ThreadPoolExecutor 。 Executor: 代表線程的接口,有一個 execute() 方法,給一個 Runnable 類型對象
    的頭像 發表于 10-11 15:33 ?787次閱讀
    Java中的<b class='flag-5'>線程</b><b class='flag-5'>池</b>包括哪些

    線程是如何實現的

    線程的概念是什么?線程是如何實現的?
    發表于 02-28 06:20

    基于線程技術集群接入點的應用研究

    本文在深入研究高級線程技術的基礎上,分析、研究了固定線程數目的線程
    發表于 01-22 14:21 ?5次下載

    java自帶的線程方法

    二、原理分析 從上面使用線程的例子來看,最主要就是兩步,構造ThreadPoolExecutor對象,然后每來一個任務,就調用ThreadPoolExecutor對象的execute方法。 1
    發表于 09-27 11:06 ?0次下載

    基于Nacos的簡單動態化線程實現

    本文以Nacos作為服務配置中心,以修改線程核心線程數、最大線程數為例,實現一個簡單的動態化線程
    發表于 01-06 14:14 ?846次閱讀

    線程線程

    線程通常用于服務器應用程序。 每個傳入請求都將分配給線程池中的一個線程,因此可以異步處理請求,而不會占用主線程,也不會延遲后續請求的處理
    的頭像 發表于 02-28 09:53 ?753次閱讀
    多<b class='flag-5'>線程</b>之<b class='flag-5'>線程</b><b class='flag-5'>池</b>

    Java線程核心原理

    看過Java線程源碼的小伙伴都知道,在Java線程池中最核心的就是ThreadPoolExecutor,
    的頭像 發表于 04-21 10:24 ?836次閱讀

    細數線程的10個坑

    JDK開發者提供了線程的實現,我們基于Executors組件,就可以快速創建一個線程
    的頭像 發表于 06-16 10:11 ?708次閱讀
    細數<b class='flag-5'>線程</b><b class='flag-5'>池</b>的10個坑

    線程線程怎么釋放

    線程分組看,pool名開頭線程占616條,而且waiting狀態也是616條,這個點就非常可疑了,我斷定就是這個pool開頭線程導致的問題。我們先排查為何這個
    發表于 07-31 10:49 ?2247次閱讀
    <b class='flag-5'>線程</b><b class='flag-5'>池</b>的<b class='flag-5'>線程</b>怎么釋放

    Spring 的線程應用

    。 使用@Async聲明多線程 SpringBoot 提供了注解 @Async 來使用線程, 具體使用方法如下: 在啟動(配置)添加
    的頭像 發表于 10-13 10:47 ?595次閱讀
    Spring 的<b class='flag-5'>線程</b><b class='flag-5'>池</b>應用

    線程基本概念與原理

    一、線程基本概念與原理 1.1 線程概念及優勢 C++線程
    的頭像 發表于 11-10 10:24 ?488次閱讀

    線程的基本概念

    線程的基本概念 不管線程是什么東西!但是我們必須知道線程被搞出來的目的就是:提高程序執行效
    的頭像 發表于 11-10 16:37 ?500次閱讀
    <b class='flag-5'>線程</b><b class='flag-5'>池</b>的基本概念

    線程七大核心參數執行順序

    線程是一種用于管理和調度線程執行的技術,通過將任務分配到線程池中的線程進行處理,可以有效地控制
    的頭像 發表于 12-04 16:45 ?973次閱讀

    線程的創建方式有幾種

    線程是一種用于管理和調度線程技術,能夠有效地提高系統的性能和資源利用率。它通過預先創建一組線程并維護一個工作隊列,將任務提交給
    的頭像 發表于 12-04 16:52 ?828次閱讀

    什么是動態線程?動態線程的簡單實現思路

    因此,動態可監控線程一種針對以上痛點開發的線程管理工具。主要可實現功能有:提供對 Spring 應用內線程
    的頭像 發表于 02-28 10:42 ?594次閱讀