C++11并發(fā)學(xué)習(xí)筆記:為什么要使用線程池
為什么要使用線程池?
? ? ? ?目前的大多數(shù)網(wǎng)絡(luò)服務(wù)器,包括Web服務(wù)器、Email服務(wù)器以及數(shù)據(jù)庫服務(wù)器等都具有一個共同點(diǎn),就是單位時間內(nèi)必須處理數(shù)目巨大的連接請求,但處理時間卻相對較短。
? ? ? ?傳統(tǒng)多線程方案中我們采用的服務(wù)器模型則是一旦接受到請求之后,即創(chuàng)建一個新的線程,由該線程執(zhí)行任務(wù)。任務(wù)執(zhí)行完畢后,線程退出,這就是是“即時創(chuàng)建,即時銷毀”的策略。盡管與創(chuàng)建進(jìn)程相比,創(chuàng)建線程的時間已經(jīng)大大的縮短,但是如果提交給線程的任務(wù)是執(zhí)行時間較短,而且執(zhí)行次數(shù)極其頻繁,那么服務(wù)器將處于不停的創(chuàng)建線程,銷毀線程的狀態(tài)。
我們將傳統(tǒng)方案中的線程執(zhí)行過程分為三個過程:T1、T2、T3。
T1:線程創(chuàng)建時間
T2:線程執(zhí)行時間,包括線程的同步等時間
T3:線程銷毀時間
? ? ? ?那么我們可以看出,線程本身的開銷所占的比例為(T1+T3) / (T1+T2+T3)。如果線程執(zhí)行的時間很短的話,這比開銷可能占到20%-50%左右。如果任務(wù)執(zhí)行時間很長的話,這筆開銷將是不可忽略的。
? ? ? ?除此之外,線程池能夠減少創(chuàng)建的線程個數(shù)。通常線程池所允許的并發(fā)線程是有上界的,如果同時需要并發(fā)的線程數(shù)超過上界,那么一部分線程將會等待。而傳統(tǒng)方案中,如果同時請求數(shù)目為2000,那么最壞情況下,系統(tǒng)可能需要產(chǎn)生2000個線程。盡管這不是一個很大的數(shù)目,但是也有部分機(jī)器可能達(dá)不到這種要求。
? ? ? ?因此線程池的出現(xiàn)正是著眼于減少線程本身帶來的開銷。線程池采用預(yù)創(chuàng)建的技術(shù),在應(yīng)用程序啟動之后,將立即創(chuàng)建一定數(shù)量的線程(N1),放入空閑隊(duì)列中。這些線程都是處于阻塞(Suspended)狀態(tài),不消耗CPU,但占用較小的內(nèi)存空間。當(dāng)任務(wù)到來后,緩沖池選擇一個空閑線程,把任務(wù)傳入此線程中運(yùn)行。當(dāng)N1個線程都在處理任務(wù)后,緩沖池自動創(chuàng)建一定數(shù)量的新線程,用于處理更多的任務(wù)。在任務(wù)執(zhí)行完畢后線程也不退出,而是繼續(xù)保持在池中等待下一次的任務(wù)。當(dāng)系統(tǒng)比較空閑時,大部分線程都一直處于暫停狀態(tài),線程池自動銷毀一部分線程,回收系統(tǒng)資源。
? ? ? 基于這種預(yù)創(chuàng)建技術(shù),線程池將線程創(chuàng)建和銷毀本身所帶來的開銷分?jǐn)偟搅烁鱾€具體的任務(wù)上,執(zhí)行次數(shù)越多,每個任務(wù)所分擔(dān)到的線程本身開銷則越小,不過我們另外可能需要考慮進(jìn)去線程之間同步所帶來的開銷
線程池適合場景
? ? ? ?事實(shí)上,線程池并不是萬能的。它有其特定的使用場合。線程池致力于減少線程本身的開銷對應(yīng)用所產(chǎn)生的影響,這是有前提的,前提就是線程本身開銷與線程執(zhí)行任務(wù)相比不可忽略。如果線程本身的開銷相對于線程任務(wù)執(zhí)行開銷而言是可以忽略不計的,那么此時線程池所帶來的好處是不明顯的,比如對于FTP服務(wù)器以及Telnet服務(wù)器,通常傳送文件的時間較長,開銷較大,那么此時,我們采用線程池未必是理想的方法,我們可以選擇“即時創(chuàng)建,即時銷毀”的策略。
總之線程池通常適合下面的幾個場合:
(1)單位時間內(nèi)處理任務(wù)頻繁而且任務(wù)處理時間短
(2)對實(shí)時性要求較高。如果接受到任務(wù)后在創(chuàng)建線程,可能滿足不了實(shí)時要求,因此必須采用線程池進(jìn)行預(yù)創(chuàng)建。
首先感謝github上大神的分享:https://github.com/progschj/ThreadPool
代碼非常的簡潔,只有一個頭文件ThreadPool.h,這里貼出來作為備份。
#ifndef?THREAD_POOL_H
#define?THREAD_POOL_H
#include#include#include#include#include#include#include#include#includeclass?ThreadPool?{
public:
????ThreadPool(size_t);
????templateauto?enqueue(F&&?f,?Args&&...?args)?
????????->?std::future<typename?std::result_of::type>;
????~ThreadPool();
private:
????//?need?to?keep?track?of?threads?so?we?can?join?them
????std::vector<?std::thread?>?workers;
????//?the?task?queue
????std::queue<?std::function>?tasks;
????
????//?synchronization
????std::mutex?queue_mutex;
????std::condition_variable?condition;
????bool?stop;
};
?
//?the?constructor?just?launches?some?amount?of?workers
inline?ThreadPool::ThreadPool(size_t?threads)
????:???stop(false)
{
????for(size_t?i?=?0;i<threads;++i)
????????workers.emplace_back(
????????????[this]
????????????{
????????????????for(;;)
????????????????{
????????????????????std::functiontask;
????????????????????{
????????????????????????std::unique_locklock(this->queue_mutex);
????????????????????????this->condition.wait(lock,
????????????????????????????[this]{?return?this->stop?||?!this->tasks.empty();?});
????????????????????????if(this->stop?&&?this->tasks.empty())
????????????????????????????return;
????????????????????????task?=?std::move(this->tasks.front());
????????????????????????this->tasks.pop();
????????????????????}
????????????????????task();
????????????????}
????????????}
????????);
}
//?add?new?work?item?to?the?pool
templateauto?ThreadPool::enqueue(F&&?f,?Args&&...?args)?
????->?std::future<typename?std::result_of::type>
{
????using?return_type?=?typename?std::result_of::type;
????auto?task?=?std::make_shared<?std::packaged_task>(
????????????std::bind(std::forward(f),?std::forward(args)...)
????????);
????????
????std::futureres?=?task->get_future();
????{
????????std::unique_locklock(queue_mutex);
????????//?don't?allow?enqueueing?after?stopping?the?pool
????????if(stop)
????????????throw?std::runtime_error("enqueue?on?stopped?ThreadPool");
????????tasks.emplace([task](){?(*task)();?});
????}
????condition.notify_one();
????return?res;
}
//?the?destructor?joins?all?threads
inline?ThreadPool::~ThreadPool()
{
????{
????????std::unique_locklock(queue_mutex);
????????stop?=?true;
????}
????condition.notify_all();
????for(std::thread?&worker:?workers)
????????worker.join();
}
#endif基本使用方法
#include#include?"ThreadPool.h"
int?main()
{
????//?create?thread?pool?with?4?worker?threads
????ThreadPool?pool(4);
????//?enqueue?and?store?future
????auto?result?=?pool.enqueue([](int?answer)?{?return?answer;?},?42);
????//?get?result?from?future,?print?42
????std::cout?<<?result.get()?<<?std::endl;?
}另一個例子
#include#include?"ThreadPool.h"
void?func()
{
????std::this_thread::sleep_for(std::chrono::milliseconds(100));
????std::cout<<"worker?thread?ID:"<<std::this_thread::get_id()<<std::endl;
}
int?main()
{
????ThreadPool?pool(4);
????while(1)
????{
???????pool.enqueue(fun);
????}
}可以看出,四個線程都在運(yùn)行。但是如果把func()中的延時放在main()的while循環(huán)中,就只有一個線程在運(yùn)行了。
線程池,最簡單的就是生產(chǎn)者消費(fèi)者模型了。池里的每條線程,都是消費(fèi)者,他們消費(fèi)并處理一個個的任務(wù),而任務(wù)隊(duì)列就相當(dāng)于生產(chǎn)者了。
線程池最簡單的形式是含有一個固定數(shù)量的工作線程來處理任務(wù),典型的數(shù)量是std::thread::hardware_concurrency()





