日本黄色一级经典视频|伊人久久精品视频|亚洲黄色色周成人视频九九九|av免费网址黄色小短片|黄色Av无码亚洲成年人|亚洲1区2区3区无码|真人黄片免费观看|无码一级小说欧美日免费三级|日韩中文字幕91在线看|精品久久久无码中文字幕边打电话

當前位置:首頁 > 單片機 > 程序喵大人


為什么要并發(fā)編程


大型的軟件項目常常包含非常多的任務(wù)需要處理。例如:對于大量數(shù)據(jù)的數(shù)據(jù)流處理,或者是包含復(fù)雜GUI界面的應(yīng)用程序。如果將所有的任務(wù)都以串行的方式執(zhí)行,則整個系統(tǒng)的效率將會非常低下,應(yīng)用程序的用戶體驗會非常的差。
另一方面,自上個世紀六七十年代英特爾創(chuàng)始人之一 Gordon Moore 提出 摩爾定義 以來,CPU頻率以每18個月翻一番的指數(shù)速度增長。但這一增長在最近的十年已經(jīng)基本停滯,大家會發(fā)現(xiàn)曾經(jīng)有過一段時間CPU的頻率從3G到達4G,但在這之后就停滯不前了。因此最近的新款CPU也基本上都是3G左右的頻率。相應(yīng)的,CPU以更多核的形式在增長。目前的Intel i7有8核的版本,Xeon處理器達到了28核。并且,最近幾年手機上使用的CPU也基本上是4核或者8核的了。
由此,掌握并發(fā)編程技術(shù),利用多處理器來提升軟件項目的性能將是軟件工程師的一項基本技能。
本文以C++語言為例,講解如何進行并發(fā)編程。并盡可能涉及C++11,C++14以及C++17中的主要內(nèi)容。

并發(fā)與并行


并發(fā)(Concurrent)與并行(Parallel)都是很常見的術(shù)語。
Erlang之父Joe Armstrong曾經(jīng)以人們使用咖啡機的場景為例描述了這兩個術(shù)語。如下圖所示:

  • 并發(fā):如果多個隊列可以交替使用某臺咖啡機,則這一行為就是并發(fā)的。
  • 并行:如果存在多臺咖啡機可以被多個隊列交替使用,則就是并行。

這里隊列中的每個人類比于計算機的任務(wù),咖啡機類比于計算機處理器。因此:并發(fā)和并行都是在多任務(wù)的環(huán)境下的討論。
更嚴格的來說:如果一個系統(tǒng)支持多個動作同時存在,那么這個系統(tǒng)就是一個并發(fā)系統(tǒng)。如果這個系統(tǒng)還支持多個動作(物理時間上)同時執(zhí)行,那么這個系統(tǒng)就是一個并行系統(tǒng)。
你可能已經(jīng)看出,“并行”其實是“并發(fā)”的子集。它們的區(qū)別在于是否具有多個處理器。如果存在多個處理器同時執(zhí)行多個線程,就是并行。
在不考慮處理器數(shù)量的情況下,我們統(tǒng)稱之為“并發(fā)”。

進程與線程


進程與線程是操作系統(tǒng)的基本概念。無論是桌面系統(tǒng):MacOS,Linux,Windows,還是移動操作系統(tǒng):Android,iOS,都存在進程和線程的概念。
進程(英語:process),是指計算機中已運行的程序。進程為曾經(jīng)是分時系統(tǒng)的基本運作單位。在面向進程設(shè)計的系統(tǒng)(如早期的UNIX,Linux 2.4及更早的版本)中,進程是程序的基本執(zhí)行實體;線程(英語:thread)是操作系統(tǒng)能夠進行運算調(diào)度的最小單位。它被包含在進程之中,是進程中的實際運作單位。-- 維基百科

關(guān)于這兩個概念在任何一本操作系統(tǒng)書上都可以找到定義。網(wǎng)上也有很多文章對它們進行了解釋。因此這里不再贅述,這里僅僅提及一下它們與編程的關(guān)系。
對于絕大部分編程語言或者編程環(huán)境來說,我們所寫的程序都會在一個進程中運行。一個進程至少會包含一個線程。這個線程我們通常稱之為主線程。
在默認的情況下,我們寫的代碼都是在進程的主線程中運行,除非開發(fā)者在程序中創(chuàng)建了新的線程。
不同編程語言的線程環(huán)境會不一樣,Java語言在很早就支持了多線程接口。(Java程序在Java虛擬機中運行,虛擬機通常還會包含自己特有的線程,例如垃圾回收線程。)。而對于JavaScript這樣的語言來說,它就沒有多線程的概念。
當我們只有一個處理器時,所有的進程或線程會分時占用這個處理器。但如果系統(tǒng)中存在多個處理器時,則就可能有多個任務(wù)并行的運行在不同的處理器上。
下面兩幅圖以不同顏色的矩形代表不同的任務(wù)(可能是進程,也可能是線程)來描述它們可能在處理器上執(zhí)行的順序。
下圖是單核處理器的情況:

下面是四核處理器的情況:

任務(wù)會在何時占有處理器,通常是由操作系統(tǒng)的調(diào)度策略決定的。在《Android系統(tǒng)上的進程管理:進程的調(diào)度》一文中,我們介紹過Linux的調(diào)度策略。
當我們在開發(fā)跨平臺的軟件時,我們不應(yīng)當對調(diào)度策略做任何假設(shè),而應(yīng)該抱有“系統(tǒng)可能以任意順序來調(diào)度我的任務(wù)”這樣的想法。

并發(fā)系統(tǒng)的性能


開發(fā)并發(fā)系統(tǒng)最主要的動機就是提升系統(tǒng)性能(事實上,這是以增加復(fù)雜度為代價的)。
但我們需要知道,單純的使用多線程并不一定能提升系統(tǒng)性能(當然,也并非線程越多系統(tǒng)的性能就越好)。從上面的兩幅圖我們就可以直觀的感受到:線程(任務(wù))的數(shù)量要根據(jù)具體的處理器數(shù)量來決定。假設(shè)只有一個處理器,那么劃分太多線程可能會適得其反。因為很多時間都花在任務(wù)切換上了。
因此,在設(shè)計并發(fā)系統(tǒng)之前,一方面我們需要做好對于硬件性能的了解,另一方面需要對我們的任務(wù)有足夠的認識。
關(guān)于這一點,你可能需要了解一下阿姆達爾定律了。對于這個定律,簡單來說:我們想要預(yù)先意識到那些任務(wù)是可以并行的,那些是無法并行的。只有明確了任務(wù)的性質(zhì),才能有的放矢的進行優(yōu)化。這個定律告訴了我們將系統(tǒng)并行之后性能收益的上限。
關(guān)于阿姆達爾定律在Linux系統(tǒng)監(jiān)測工具sysstat介紹一文中已經(jīng)介紹過,因此這里不再贅述。

C++與并發(fā)編程


前面我們已經(jīng)了解到,并非所有的語言都提供了多線程的環(huán)境。
即便是C++語言,直到C++11標準之前,也是沒有多線程支持的。在這種情況下,Linux/Unix平臺下的開發(fā)者通常會使用POSIX Threads,Windows上的開發(fā)者也會有相應(yīng)的接口。但很明顯,這些API都只針對特定的操作系統(tǒng)平臺,可移植性較差。如果要同時支持Linux和Windows系統(tǒng),你可能要寫兩套代碼。
相較而言,Java自JDK 1.0就包含了多線程模型。

這個狀態(tài)在C++ 11標準發(fā)布之后得到了改變。并且,在C++ 14和C++ 17標準中又對并發(fā)編程機制進行了增強。
下圖是最近幾個版本的C++標準特性的線路圖。


編譯器與C++標準


編譯器對于語言特性的支持是逐步完成的。想要使用特定的特性你需要相應(yīng)版本的編譯器。
  • GCC對于C++特性的支持請參見這里:C++ Standards Support in GCC。
  • Clang對于C++特性的支持請參見這里:C++ Support in Clang。

下面兩個表格列出了C++標準和相應(yīng)編譯器的版本對照:
  • C++標準與相應(yīng)的GCC版本要求如下:
  • C++標準與相應(yīng)的Clang版本要求如下:

默認情況下編譯器是以較低的標準來進行編譯的,如果希望使用新的標準,你需要通過編譯參數(shù)-std=c++xx告知編譯器,例如:
		
g++ -std=c++17 your_file.cpp -o your_program

測試環(huán)境


本文的源碼可以到下載我的github上獲取,地址:paulQuei/cpp-concurrency。你可以直接通過下面這條命令獲取源碼:
		
git clone https://github.com/paulQuei/cpp-concurrency.git

源碼下載之后,你可以通過任何文本編輯器瀏覽源碼。如果希望編譯和運行程序,你還需要按照下面的內(nèi)容來準備環(huán)境。
本文中的源碼使用cmake編譯,只有cmake 3.8以上的版本才支持C++ 17,所以你需要安裝這個或者更新版本的cmake。
另外,截止目前(2019年10月)為止,clang編譯器還不支持并行算法。
但是gcc-9是支持的。因此想要編譯和運行這部分代碼,你需要安裝gcc 9.0或更新的版本。并且,gcc-9還要依賴Intel Threading Building Blocks才能使用并行算法以及頭文件。
具體的安裝方法見下文。
具體編譯器對于C++特性支持的情況請參見這里:C++ compiler support。

安裝好之后運行根目錄下的下面這個命令即可:
 

		
./make_all.sh

它會完成所有的編譯工作。
本文的源碼在下面兩個環(huán)境中經(jīng)過測試,環(huán)境的準備方法如下。

MacOS


在Mac上,我使用brew工具安裝gcc以及tbb庫。
考慮到其他人與我的環(huán)境可能會有所差異,所以需要手動告知tbb庫的安裝路徑。讀者需要執(zhí)行下面這些命令來準備環(huán)境:

		
rew install gccbrew insbtall tbb export tbb_path=/usr/local/Cellar/tbb/2019_U8/./make_all.sh

注意,請通過運行g(shù)++-9命令以確認gcc的版本是否正確,如果版本較低,則需要通過brew命令將其升級到新版本:
		
brew upgrade gcc

Ubuntu


Ubuntu上,通過下面的命令安裝gcc-9。
		
sudo add-apt-repository ppa:ubuntu-toolchain-r/testsudo apt-get updatesudo apt install  gcc-9 g++-9

但安裝tbb庫就有些麻煩了。這是因為Ubuntu 16.04默認關(guān)聯(lián)的版本是較低的,直接安裝是無法使用的。我們需要安裝更新的版本。聯(lián)網(wǎng)安裝的方式步驟繁瑣,所以可以通過下載包的方式進行安裝,我已經(jīng)將這需要的兩個文件放到的這里:
  • libtbb2_2019~U8-1_amd64.deb
  • libtbb-dev_2019~U8-1_amd64.deb

如果需要,你可以下載后通過apt命令安裝即可:
		
sudo apt install ~/Downloads/libtbb2_2019~U8-1_amd64.deb sudo apt install ~/Downloads/libtbb-dev_2019~U8-1_amd64.deb

線程


創(chuàng)建線程


創(chuàng)建線程非常的簡單的,下面就是一個使用了多線程的Hello World示例:
		
// 01_hello_thread.cpp #include#include// ① using namespace std; // ② void hello() { // ③ cout << "Hello World from new thread." << endl;} int main() { thread t(hello); // ④ t.join(); // ⑤  return 0;}

對于這段代碼說明如下:
  1. 為了使用多線程的接口,我們需要#include頭文件。
  2. 為了簡化聲明,本文中的代碼都將using namespace std;。
  3. 新建線程的入口是一個普通的函數(shù),它并沒有什么特別的地方。
  4. 創(chuàng)建線程的方式就是構(gòu)造一個thread對象,并指定入口函數(shù)。與普通對象不一樣的是,此時編譯器便會為我們創(chuàng)建一個新的操作系統(tǒng)線程,并在新的線程中執(zhí)行我們的入口函數(shù)。
  5. 關(guān)于join函數(shù)在下文中講解。

thread可以和callable類型一起工作,因此如果你熟悉lambda表達式,你可以直接用它來寫線程的邏輯,像這樣:
		
// 02_lambda_thread.cpp #include#include using namespace std; int main() { thread t([] { cout << "Hello World from lambda thread." << endl; });  t.join();  return 0;}

為了減少不必要的重復(fù),若無必要,下文中的代碼將不貼出include指令以及using聲明。

當然,你可以傳遞參數(shù)給入口函數(shù),像下面這樣:
		
// 03_thread_argument.cpp void hello(string name) { cout << "Welcome to " << name << endl;} int main() { thread t(hello, "https://paul.pub"); t.join();  return 0;}

不過需要注意的是,參數(shù)是以拷貝的形式進行傳遞的。因此對于拷貝耗時的對象你可能需要傳遞指針或者引用類型作為參數(shù)。但是,如果是傳遞指針或者引用,你還需要考慮參數(shù)對象的生命周期。因為線程的運行長度很可能會超過參數(shù)的生命周期(見下文detach),這個時候如果線程還在訪問一個已經(jīng)被銷毀的對象就會出現(xiàn)問題。

join與detach


  • 主要API

一旦啟動線程之后,我們必須決定是要等待直接它結(jié)束(通過join),還是讓它獨立運行(通過detach),我們必須二者選其一。如果在thread對象銷毀的時候我們還沒有做決定,則thread對象在析構(gòu)函數(shù)出將調(diào)用std::terminate()從而導(dǎo)致我們的進程異常退出。
請思考在上面的代碼示例中,thread對象在何時會銷毀。

需要注意的是:在我們做決定的時候,很可能線程已經(jīng)執(zhí)行完了(例如上面的示例中線程的邏輯僅僅是一句打印,執(zhí)行時間會很短)。新的線程創(chuàng)建之后,究竟是新的線程先執(zhí)行,還是當前線程的下一條語句先執(zhí)行這是不確定的,因為這是由操作系統(tǒng)的調(diào)度策略決定的。不過這不要緊,我們只要在thread對象銷毀前做決定即可。
  • join:調(diào)用此接口時,當前線程會一直阻塞,直到目標線程執(zhí)行完成(當然,很可能目標線程在此處調(diào)用之前就已經(jīng)執(zhí)行完成了,不過這不要緊)。因此,如果目標線程的任務(wù)非常耗時,你就要考慮好是否需要在主線程上等待它了,因此這很可能會導(dǎo)致主線程卡住。
  • detach:detach是讓目標線程成為守護線程(daemon threads)。一旦detach之后,目標線程將獨立執(zhí)行,即便其對應(yīng)的thread對象銷毀也不影響線程的執(zhí)行。并且,你無法再與之通信。

對于這兩個接口,都必須是可執(zhí)行的線程才有意義。你可以通過joinable()接口查詢是否可以對它們進行join或者detach。

管理當前線程


  • 主要API

上面是一些在線程內(nèi)部使用的API,它們用來對當前線程做一些控制。
  • yield 通常用在自己的主要任務(wù)已經(jīng)完成的時候,此時希望讓出處理器給其他任務(wù)使用。
  • get_id 返回當前線程的id,可以以此來標識不同的線程。
  • sleep_for 是讓當前線程停止一段時間。
  • sleep_until 和sleep_for類似,但是是以具體的時間點為參數(shù)。這兩個API都以chrono API(由于篇幅所限,這里不展開這方面內(nèi)容)為基礎(chǔ)。

下面是一個代碼示例:
		
// 04_thread_self_manage.cpp void print_time() { auto now = chrono::system_clock::now(); auto in_time_t = chrono::system_clock::to_time_t(now);  std::stringstream ss; ss << put_time(localtime(&in_time_t), "%Y-%m-%d %X"); cout << "now is: " << ss.str() << endl;} void sleep_thread() { this_thread::sleep_for(chrono::seconds(3)); cout << "[thread-" << this_thread::get_id() << "] is waking up" << endl;} void loop_thread() { for (int i = 0; i < 10; i++) { cout << "[thread-" << this_thread::get_id() << "] print: " << i << endl; }} int main() { print_time();  thread t1(sleep_thread); thread t2(loop_thread);  t1.join(); t2.detach();  print_time(); return 0;}

這段代碼應(yīng)該還是比較容易理解的,這里創(chuàng)建了兩個線程。它們都會有一些輸出,其中一個會先停止3秒鐘,然后再輸出。主線程調(diào)用join會一直卡住等待它運行結(jié)束。這段程序的輸出如下:
		
now is: 2019-10-13 10:17:48[thread-0x70000cdda000] print: 0[thread-0x70000cdda000] print: 1[thread-0x70000cdda000] print: 2[thread-0x70000cdda000] print: 3[thread-0x70000cdda000] print: 4[thread-0x70000cdda000] print: 5[thread-0x70000cdda000] print: 6[thread-0x70000cdda000] print: 7[thread-0x70000cdda000] print: 8[thread-0x70000cdda000] print: 9[thread-0x70000cd57000] is waking upnow is: 2019-10-13 10:17:51

一次調(diào)用


  • 主要API

在一些情況下,我們有些任務(wù)需要執(zhí)行一次,并且我們只希望它執(zhí)行一次,例如資源的初始化任務(wù)。這個時候就可以用到上面的接口。這個接口會保證,即便在多線程的環(huán)境下,相應(yīng)的函數(shù)也只會調(diào)用一次。
下面就是一個示例:有三個線程都會使用init函數(shù),但是只會有一個線程真正執(zhí)行它。
		
// 05_call_once.cpp void init() { cout << "Initialing..." << endl; // Do something...} void worker(once_flag* flag) { call_once(*flag, init);} int main() { once_flag flag;  thread t1(worker, &flag); thread t2(worker, &flag); thread t3(worker, &flag);  t1.join(); t2.join(); t3.join();  return 0;}

我們無法確定具體是哪一個線程會執(zhí)行init。而事實上,我們也不關(guān)心,因為只要有某個線程完成這個初始化工作就可以了。
請思考一下,為什么要在main函數(shù)中創(chuàng)建once_flag flag。如果是在worker函數(shù)中直接聲明一個once_flag并使用行不行?為什么?


并發(fā)任務(wù)


下面以一個并發(fā)任務(wù)為示例講解如何引入多線程。
任務(wù)示例:現(xiàn)在假設(shè)我們需要計算某個范圍內(nèi)所有自然數(shù)的平方根之和,例如[1, 10e8]。
在單線程模型下,我們的代碼可能是這樣的:
		
// 06_naive_multithread.cpp static const int MAX = 10e8; // ①static double sum = 0; // ② void worker(int min, int max) { // ③ for (int i = min; i <= max; i++) { sum += sqrt(i); }} void serial_task(int min, int max) { // ④ auto start_time = chrono::steady_clock::now(); sum = 0; worker(0, MAX); auto end_time = chrono::steady_clock::now(); auto ms = chrono::duration_cast(end_time - start_time).count(); cout << "Serail task finish, " << ms << " ms consumed, Result: " << sum << endl;}

這段代碼說明如下:
  1. 通過一個常量指定數(shù)據(jù)范圍,這個是為了方便調(diào)整。
  2. 通過一個全局變量來存儲結(jié)果。
  3. 通過一個任務(wù)函數(shù)來計算值。
  4. 統(tǒng)計任務(wù)的執(zhí)行時間。

這段程序輸出如下:
		
Serail task finish, 6406 ms consumed, Result: 2.10819e+13

很顯然,上面單線程的做法性能太差了。我們的任務(wù)完全是可以并發(fā)執(zhí)行的。并且任務(wù)很容易劃分。
下面我們就嘗試以多線程的方式來改造原先的程序。
改造后的程序如下:
		
// 06_naive_multithread.cpp void concurrent_task(int min, int max) { auto start_time = chrono::steady_clock::now();  unsigned concurrent_count = thread::hardware_concurrency(); // ① cout << "hardware_concurrency: " << concurrent_count << endl; vectorthreads; min = 0; sum = 0; for (int t = 0; t < concurrent_count; t++) { // ② int range = max / concurrent_count * (t + 1); threads.push_back(thread(worker, min, range)); // ③ min = range + 1; } for (auto& t : threads) { t.join(); // ④ }  auto end_time = chrono::steady_clock::now(); auto ms = chrono::duration_cast(end_time - start_time).count(); cout << "Concurrent task finish, " << ms << " ms consumed, Result: " << sum << endl;}

這段代碼說明如下:
  1. thread::hardware_concurrency()可以獲取到當前硬件支持多少個線程并行執(zhí)行。
  2. 根據(jù)處理器的情況決定線程的數(shù)量。
  3. 對于每一個線程都通過worker函數(shù)來完成任務(wù),并劃分一部分數(shù)據(jù)給它處理。
  4. 等待每一個線程執(zhí)行結(jié)束。

很好,似乎很簡單就完成了并發(fā)的改造。然后我們運行一下這個程序:
		
hardware_concurrency: 16Concurrent task finish, 6246 ms consumed, Result: 1.78162e+12

很抱歉,我們會發(fā)現(xiàn)這里的性能并沒有明顯的提升。更嚴重的是,這里的結(jié)果是錯誤的。
要搞清楚為什么結(jié)果不正確我們需要更多的背景知識。
我們知道,對于現(xiàn)代的處理器來說,為了加速處理的速度,每個處理器都會有自己的高速緩存(Cache),這個高速緩存是與每個處理器相對應(yīng)的,如下圖所示:
事實上,目前大部分CPU的緩存已經(jīng)不只一層。


處理器在進行計算的時候,高速緩存會參與其中,例如數(shù)據(jù)的讀和寫。而高速緩存和系統(tǒng)主存(Memory)是有可能存在不一致的。即:某個結(jié)果計算后保存在處理器的高速緩存中了,但是沒有同步到主存中,此時這個值對于其他處理器就是不可見的。
事情還遠不止這么簡單。我們對于全局變量值的修改:sum += sqrt(i);這條語句,它并非是原子的。它其實是很多條指令的組合才能完成。假設(shè)在某個設(shè)備上,這條語句通過下面這幾個步驟來完成。它們的時序可能如下所示:

在時間點a的時候,所有線程對于sum變量的值是一致的。
但是在時間點b之后,thread3上已經(jīng)對sum進行了賦值。而這個時候其他幾個線程也同時在其他處理器上使用了這個值,那么這個時候它們所使用的值就是舊的(錯誤的)。最后得到的結(jié)果也自然是錯的。

競爭條件與臨界區(qū)


當多個進程或者線程同時訪問共享數(shù)據(jù)時,只要有一個任務(wù)會修改數(shù)據(jù),那么就可能會發(fā)生問題。此時結(jié)果依賴于這些任務(wù)執(zhí)行的相對時間,這種場景稱為競爭條件(race condition)。
訪問共享數(shù)據(jù)的代碼片段稱之為臨界區(qū)(critical section)。具體到上面這個示例,臨界區(qū)就是讀寫sum變量的地方。
要避免競爭條件,就需要對臨界區(qū)進行數(shù)據(jù)保護。
很自然的,現(xiàn)在我們能夠理解發(fā)生競爭條件是因為這些線程在同時訪問共享數(shù)據(jù),其中有些線程的改動沒有讓其他線程知道,導(dǎo)致其他線程在錯誤的基礎(chǔ)上進行處理,結(jié)果自然也就是錯誤的。
那么,如果一次只讓一個線程訪問共享數(shù)據(jù),訪問完了再讓其他線程接著訪問,這樣就可以避免問題的發(fā)生了。
接下來介紹的API提供的就是這樣的功能。

互斥體與鎖


mutex


開發(fā)并發(fā)系統(tǒng)的目的主要是為了提升性能:將任務(wù)分散到多個線程,然后在不同的處理器上同時執(zhí)行。這些分散開來的線程通常會包含兩類任務(wù):
  1. 獨立的對于劃分給自己的數(shù)據(jù)的處理
  2. 對于處理結(jié)果的匯總

其中第1項任務(wù)因為每個線程是獨立的,不存在競爭條件的問題。而第2項任務(wù),由于所有線程都可能往總結(jié)果(例如上面的sum變量)匯總,這就需要做保護了。在某一個具體的時刻,只應(yīng)當有一個線程更新總結(jié)果,即:保證每個線程對于共享數(shù)據(jù)的訪問是“互斥”的。mutex 就提供了這樣的功能。
mutex是mutual exclusion(互斥)的簡寫。
  • 主要API

很明顯,在這些類中,mutex是最基礎(chǔ)的API。其他類都是在它的基礎(chǔ)上的改進。所以這些類都提供了下面三個方法,并且它們的功能是一樣的:
| 方法| 說明 |
| lock|鎖定互斥體,如果不可用,則阻塞 |
| try_lock |嘗試鎖定互斥體,如果不可用,直接返回 |
|unlock | 解鎖互斥體|

這三個方法提供了基礎(chǔ)的鎖定和解除鎖定的功能。使用lock意味著你有很強的意愿一定要獲取到互斥體,而使用try_lock則是進行一次嘗試。這意味著如果失敗了,你通常還有其他的路徑可以走。
在這些基礎(chǔ)功能之上,其他的類分別在下面三個方面進行了擴展:
  • 超時:timed_mutex,recursive_timed_mutex,shared_timed_mutex名稱都帶有timed,這意味著它們都支持超時功能。它們都提供了try_lock_for和try_lock_until方法,這兩個方法分別可以指定超時的時間長度和時間點。如果在超時的時間范圍內(nèi)沒有能獲取到鎖,則直接返回,不再繼續(xù)等待。
  • 可重入:recursive_mutex和recursive_timed_mutex的名稱都帶有recursive??芍厝牖蛘呓凶隹蛇f歸,是指在同一個線程中,同一把鎖可以鎖定多次。這就避免了一些不必要的死鎖。
  • 共享:shared_timed_mutex和shared_mutex提供了共享功能。對于這類互斥體,實際上是提供了兩把鎖:一把是共享鎖,一把是互斥鎖。一旦某個線程獲取了互斥鎖,任何其他線程都無法再獲取互斥鎖和共享鎖;但是如果有某個線程獲取到了共享鎖,其他線程無法再獲取到互斥鎖,但是還有獲取到共享鎖。這里互斥鎖的使用和其他的互斥體接口和功能一樣。而共享鎖可以同時被多個線程同時獲取到(使用共享鎖的接口見下面的表格)。共享鎖通常用在讀者寫者模型上。

使用共享鎖的接口如下:
| 方法| 說明 |
|lock_shared | 獲取互斥體的共享鎖,如果無法獲取則阻塞 |
| try_lock_shared| 嘗試獲取共享鎖,如果不可用,直接返回 |
| unlock_shared| 解鎖共享鎖 |

接下來,我們就借助剛學(xué)到的mutex來改造我們的并發(fā)系統(tǒng),改造后的程序如下:
		
// 07_mutex_lock.cpp static const int MAX = 10e8;static double sum = 0; static mutex exclusive; void concurrent_worker(int min, int max) { for (int i = min; i <= max; i++) { exclusive.lock(); // ① sum += sqrt(i); exclusive.unlock(); // ② }} void concurrent_task(int min, int max) { auto start_time = chrono::steady_clock::now();  unsigned concurrent_count = thread::hardware_concurrency(); cout << "hardware_concurrency: " << concurrent_count << endl; vectorthreads; min = 0; sum = 0; for (int t = 0; t < concurrent_count; t++) { int range = max / concurrent_count * (t + 1); threads.push_back(thread(concurrent_worker, min, range)); // ③ min = range + 1; } for (int i = 0; i < threads.size(); i++) { threads[i].join(); }  auto end_time = chrono::steady_clock::now(); auto ms = chrono::duration_cast(end_time - start_time).count(); cout << "Concurrent task finish, " << ms << " ms consumed, Result: " << sum << endl;}

這里只有三個地方需要關(guān)注:
  1. 在訪問共享數(shù)據(jù)之前加鎖
  2. 訪問完成之后解鎖
  3. 在多線程中使用帶鎖的版本

執(zhí)行之后結(jié)果輸出如下:
		
hardware_concurrency: 16Concurrent task finish, 74232 ms consumed, Result: 2.10819e+13

這下結(jié)果是對了,但是我們卻發(fā)現(xiàn)這個版本比原先單線程的版本性能還要差很多。這是為什么?
這是因為加鎖和解鎖是有代價的,這里計算最耗時的地方在鎖里面,每次只能有一個線程串行執(zhí)行,相比于單線程模型,它不但是串行的,還增加了鎖的負擔(dān),因此就更慢了。
這就是為什么前面說多線程系統(tǒng)會增加系統(tǒng)的復(fù)雜度,而且并非多線程系統(tǒng)一定就有更好的性能。
不過,對于這里的問題是可以改進的。我們仔細思考一下:我們劃分給每個線程的數(shù)據(jù)其實是獨立的,對于數(shù)據(jù)的處理是耗時的,但其實這部分邏輯每個線程可以單獨處理,沒必要加鎖。只有在最后匯總數(shù)據(jù)的時候進行一次鎖保護就可以了。
于是我們改造concurrent_worker,像下面這樣:
		
// 08_improved_mutex_lock.cpp void concurrent_worker(int min, int max) { double tmp_sum = 0; for (int i = min; i <= max; i++) { tmp_sum += sqrt(i); // ① } exclusive.lock(); // ② sum += tmp_sum; exclusive.unlock();}

這段代碼的改變在于兩處:
  1. 通過一個局部變量保存當前線程的處理結(jié)果
  2. 在匯總總結(jié)過的時候進行鎖保護

運行一下改進后的程序,其結(jié)果輸出如下:
		
hardware_concurrency: 16Concurrent task finish, 451 ms consumed, Result: 2.10819e+13

可以看到,性能一下就提升了好多倍。我們終于體驗到多線程帶來的好處了。
我們用鎖的粒度(granularity)來描述鎖的范圍。細粒度(fine-grained)是指鎖保護較小的范圍,粗粒度(coarse-grained)是指鎖保護較大的范圍。出于性能的考慮,我們應(yīng)該保證鎖的粒度盡可能的細。并且,不應(yīng)該在獲取鎖的范圍內(nèi)執(zhí)行耗時的操作,例如執(zhí)行IO。如果是耗時的運算,也應(yīng)該盡可能的移到鎖的外面。
In general, a lock should be held for only the minimum possible time needed to perform the required operations.--《C++ Concurrency in Action》


死鎖


死鎖是并發(fā)系統(tǒng)很常見的一類問題。
死鎖是指:兩個或以上的運算單元,每一方都在等待其他方釋放資源,但是所有方都不愿意釋放資源。結(jié)果是沒有任何一方能繼續(xù)推進下去,于是整個系統(tǒng)無法再繼續(xù)運轉(zhuǎn)。
死鎖在現(xiàn)實中也很常見,例如:兩個孩子分別拿著玩具的一半然后哭著要從對方手里得到另外一半玩具,但是誰都不肯讓步。
在成年人的世界里也會發(fā)生類似的情況,例如下面這個交通狀況:

下面我們來看一個編程示例。
現(xiàn)在假設(shè)我們在開發(fā)一個銀行的系統(tǒng),這個系統(tǒng)包含了轉(zhuǎn)賬的功能。
首先我們創(chuàng)建一個Account類來描述銀行賬號。由于這僅僅是一個演示使用的代碼,所以我們希望代碼足夠的簡單。Account類僅僅包含名稱和金額兩個字段。
另外,為了支持并發(fā),這個類包含了一個mutex對象,用來保護賬號金額,在讀寫賬號金額時需要先加鎖保護。
		
// 09_deadlock_bank_transfer.cpp class Account {public: Account(string name, double money): mName(name), mMoney(money) {}; public: void changeMoney(double amount) { mMoney += amount; } string getName() { return mName; } double getMoney() { return mMoney; } mutex* getLock() { return &mMoneyLock; } private: string mName; double mMoney; mutex mMoneyLock;};

Account類很簡單,我想就不用多做說明了。
接下來,我們再創(chuàng)建一個描述銀行的Bank類。
		
// 09_deadlock_bank_transfer.cpp class Bank {public: void addAccount(Account* account) { mAccounts.insert(account); }  bool transferMoney(Account* accountA, Account* accountB, double amount) { lock_guard guardA(*accountA->getLock()); // ① lock_guard guardB(*accountB->getLock());  if (amount > accountA->getMoney()) { // ② return false; }  accountA->changeMoney(-amount); // ③ accountB->changeMoney(amount); return true; }  double totalMoney() const { double sum = 0; for (auto a : mAccounts) { sum += a->getMoney(); } return sum; } private: set mAccounts;};

銀行類中記錄了所有的賬號,并且提供了一個方法用來查詢整個銀行的總金額。這其中,我們最主要要關(guān)注轉(zhuǎn)賬的實現(xiàn):transferMoney。該方法的幾個關(guān)鍵點如下:
  1. 為了保證線程安全,在修改每個賬號之前,需要獲取相應(yīng)的鎖。
  2. 判斷轉(zhuǎn)出賬戶金額是否足夠,如果不夠此次轉(zhuǎn)賬失敗。
  3. 進行轉(zhuǎn)賬。

有了銀行和賬戶結(jié)構(gòu)之后就可以開發(fā)轉(zhuǎn)賬系統(tǒng)了,同樣的,由于是為了演示所用,我們的轉(zhuǎn)賬系統(tǒng)也會盡可能的簡單:
		
// 09_deadlock_bank_transfer.cpp void randomTransfer(Bank* bank, Account* accountA, Account* accountB) { while(true) { double randomMoney = ((double)rand() / RAND_MAX) * 100; if (bank->transferMoney(accountA, accountB, randomMoney)) { cout << "Transfer " << randomMoney << " from " << accountA->getName() << " to " << accountB->getName() << ", Bank totalMoney: " << bank->totalMoney() << endl; } else { cout << "Transfer failed, " << accountA->getName() << " has only $" << accountA->getMoney() << ", but " << randomMoney << " required" << endl; } }}

這里每次生成一個隨機數(shù),然后通過銀行進行轉(zhuǎn)賬。
最后我們在main函數(shù)中創(chuàng)建兩個線程,互相在兩個賬號之間來回轉(zhuǎn)賬:
		
// 09_deadlock_bank_transfer.cpp int main() { Account a("Paul", 100); Account b("Moira", 100);  Bank aBank; aBank.addAccount(&a); aBank.addAccount(&b);  thread t1(randomTransfer, &aBank, &a, &b); thread t2(randomTransfer, &aBank, &b, &a);  t1.join(); t2.join();  return 0;}

至此,我們的銀行轉(zhuǎn)賬系統(tǒng)就開發(fā)完成了。然后編譯并運行,其結(jié)果可能像下面這樣:
		
...Transfer 13.2901 from Paul to Moira, Bank totalMoney: 20042.6259 from Moira to Paul, Bank totalMoney: 200Transfer failed, Moira has only $34.7581, but 66.3208 requiredTransfer failed, Moira has only $34.7581, but Transfer 93.191 from 53.9176 requiredTransfer 60.6146 from Moira to Paul, Bank totalMoney: 200Transfer 49.7304 from Moira to Paul, Bank totalMoney: 200Paul to Moira, Bank totalMoney: Transfer failed, Moira has only $17.6041, but 18.1186 requiredTransfer failed, Moira has only $17.6041, but 18.893 requiredTransfer failed, Moira has only $17.6041, but 34.7078 requiredTransfer failed, Moira has only $17.6041, but 33.9569 requiredTransfer 12.7899 from 200Moira to Paul, Bank totalMoney: 200Transfer failed, Moira has only $63.9373, but 80.9038 requiredTransfer 50.933 from Moira to Paul, Bank totalMoney: 200Transfer failed, Moira has only $13.0043, but 30.2056 requiredTransfer failed, Moira has only $Transfer 59.123 from Paul to Moira, Bank totalMoney: 200Transfer 29.0486 from Paul to Moira, Bank totalMoney: 20013.0043, but 64.7307 required

如果你運行了這個程序,你會發(fā)現(xiàn)很快它就卡住不動了。為什么?
因為發(fā)生了死鎖。
我們仔細思考一下這兩個線程的邏輯:這兩個線程可能會同時獲取其中一個賬號的鎖,然后又想獲取另外一個賬號的鎖,此時就發(fā)生了死鎖。如下圖所示:

當然,發(fā)生死鎖的原因遠不止上面這一種情況。如果兩個線程互相join就可能發(fā)生死鎖。還有在一個線程中對一個不可重入的互斥體(例如mutex而非recursive_mutex)多次加鎖也會死鎖。
你可能會覺得,我可不會這么傻,寫出這樣的代碼。但實際上,很多時候是由于代碼的深層次嵌套導(dǎo)致了死鎖的發(fā)生,由于調(diào)用關(guān)系的復(fù)雜導(dǎo)致發(fā)現(xiàn)這類問題并不容易。
如果仔細看一下上面的輸出,我們會發(fā)現(xiàn)還有另外一個問題:這里的輸出是亂的。兩個線程的輸出混雜在一起了。究其原因也很容易理解:兩個線程可能會同時輸出,沒有做好隔離。
下面我們就來逐步解決上面的問題。
對于輸出混亂的問題很好解決,專門用一把鎖來保護輸出邏輯即可:
		
// 10_improved_bank_transfer.cpp mutex sCoutLock;void randomTransfer(Bank* bank, Account* accountA, Account* accountB) { while(true) { double randomMoney = ((double)rand() / RAND_MAX) * 100; if (bank->transferMoney(accountA, accountB, randomMoney)) { sCoutLock.lock(); cout << "Transfer " << randomMoney << " from " << accountA->getName() << " to " << accountB->getName() << ", Bank totalMoney: " << bank->totalMoney() << endl; sCoutLock.unlock(); } else { sCoutLock.lock(); cout << "Transfer failed, " << accountA->getName() << " has only " << accountA->getMoney() << ", but " << randomMoney << " required" << endl; sCoutLock.unlock(); } }}

請思考一下兩處lock和unlock調(diào)用,并考慮為什么不在while(true)下面寫一次整體的加鎖和解鎖。


通用鎖定算法


  • 主要API
要避免死鎖,需要仔細的思考和設(shè)計業(yè)務(wù)邏輯。
有一個比較簡單的原則可以避免死鎖,即:對所有的鎖進行排序,每次一定要按照順序來獲取鎖,不允許亂序。例如:要獲取某個玩具,一定要先拿到鎖A,再拿到鎖B,才能玩玩具。這樣就不會死鎖了。
這個原則雖然簡單,但卻不容易遵守。因為數(shù)據(jù)常常是分散在很多地方的。
不過好消息是,C++ 11標準中為我們提供了一些工具來避免因為多把鎖而導(dǎo)致的死鎖。我們只要直接調(diào)用這些接口就可以了。這個就是上面提到的兩個函數(shù)。它們都支持傳入多個Lockable對象。
接下來我們用它來改造之前死鎖的轉(zhuǎn)賬系統(tǒng):
		
// 10_improved_bank_transfer.cpp bool transferMoney(Account* accountA, Account* accountB, double amount) { lock(*accountA->getLock(), *accountB->getLock()); // ① lock_guard lockA(*accountA->getLock(), adopt_lock); // ② lock_guard lockB(*accountB->getLock(), adopt_lock); // ③  if (amount > accountA->getMoney()) { return false; }  accountA->changeMoney(-amount); accountB->changeMoney(amount); return true;}

這里只改動了3行代碼。
  1. 這里通過lock函數(shù)來獲取兩把鎖,標準庫的實現(xiàn)會保證不會發(fā)生死鎖。
  2. lock_guard在下面我們還會詳細介紹。這里只要知道它會在自身對象生命周期的范圍內(nèi)鎖定互斥體即可。創(chuàng)建lock_guard的目的是為了在transferMoney結(jié)束的時候釋放鎖,lockB也是一樣。但需要注意的是,這里傳遞了 adopt_lock表示:現(xiàn)在是已經(jīng)獲取到互斥體了的狀態(tài)了,不用再次加鎖(如果不加adopt_lock就是二次鎖定了)。

運行一下這個改造后的程序,其輸出如下所示:
		
...Transfer failed, Paul has only $1.76243, but 17.5974 requiredTransfer failed, Paul has only $1.76243, but 59.2104 requiredTransfer failed, Paul has only $1.76243, but 49.6379 requiredTransfer failed, Paul has only $1.76243, but 63.6373 requiredTransfer failed, Paul has only $1.76243, but 51.8742 requiredTransfer failed, Paul has only $1.76243, but 50.0081 requiredTransfer failed, Paul has only $1.76243, but 86.1041 requiredTransfer failed, Paul has only $1.76243, but 51.3278 requiredTransfer failed, Paul has only $1.76243, but 66.5754 requiredTransfer failed, Paul has only $1.76243, but 32.1867 requiredTransfer failed, Paul has only $1.76243, but 62.0039 requiredTransfer failed, Paul has only $1.76243, but 98.7819 requiredTransfer failed, Paul has only $1.76243, but 27.046 requiredTransfer failed, Paul has only $1.76243, but 62.9155 requiredTransfer 98.8478 from Moira to Paul, Bank totalMoney: 200Transfer 80.0722 from Moira to Paul, Bank totalMoney: 200Transfer 73.7035 from Moira to Paul, Bank totalMoney: 200Transfer 34.4476 from Moira to Paul, Bank totalMoney: 200Transfer failed, Moira has only $10.0142, but 61.3033 requiredTransfer failed, Moira has only $10.0142, but 24.5595 required...

現(xiàn)在這個轉(zhuǎn)賬程序會一直運行下去,不會再死鎖了。輸出也是正常的了。

通用互斥管理


  • 主要API

互斥體(mutex相關(guān)類)提供了對于資源的保護功能,但是手動的鎖定(調(diào)用lock或者try_lock)和解鎖(調(diào)用unlock)互斥體是要耗費比較大的精力的,我們需要精心考慮和設(shè)計代碼才行。因為我們需要保證,在任何情況下,解鎖要和加鎖配對,因為假設(shè)出現(xiàn)一條路徑導(dǎo)致獲取鎖之后沒有正常釋放,就會影響整個系統(tǒng)。如果考慮方法還可以會拋出異常,這樣的代碼寫起來會很費勁。
鑒于這個原因,標準庫就提供了上面的這些API。它們都使用了叫做RAII的編程技巧,來簡化我們手動加鎖和解鎖的“體力活”。
請看下面的例子
		
// https://en.cppreference.com/w/cpp/thread/lock_guard #include #include #include  int g_i = 0;std::mutex g_i_mutex; // ① void safe_increment(){ std::lock_guard<std::mutex> lock(g_i_mutex); // ② ++g_i;  std::cout << std::this_thread::get_id() << ": " << g_i << '\n'; // ③} int main(){ std::cout << "main: " << g_i << '\n';  std::thread t1(safe_increment); // ④ std::thread t2(safe_increment);  t1.join(); t2.join();  std::cout << "main: " << g_i << '\n';}

這段代碼中:
  1. 全局的互斥體g_i_mutex用來保護全局變量g_i
  2. 這是一個設(shè)計為可以被多線程環(huán)境使用的方法。因此需要通過互斥體來進行保護。這里沒有調(diào)用lock方法,而是直接使用lock_guard來鎖定互斥體。
  3. 在方法結(jié)束的時候,局部變量std::lock_guardlock會被銷毀,它對互斥體的鎖定也就解除了。
  4. 在多個線程中使用這個方法。


RAII


上面的幾個類(lock_guard,unique_lock,shared_lock,scoped_lock)都使用了一個叫做RAII的編程技巧。
RAII全稱是Resource Acquisition Is Initialization,直譯過來就是:資源獲取即初始化。
RAII是一種C++編程技術(shù),它將必須在使用前請求的資源(例如:分配的堆內(nèi)存、執(zhí)行線程、打開的套接字、打開的文件、鎖定的互斥體、磁盤空間、數(shù)據(jù)庫連接等——任何存在受限供給中的事物)的生命周期與一個對象的生存周期相綁定。
RAII保證資源可用于任何會訪問該對象的函數(shù)。它亦保證所有資源在其控制對象的生存期結(jié)束時,以獲取順序的逆序釋放。類似地,若資源獲取失?。?gòu)造函數(shù)以異常退出),則為已構(gòu)造完成的對象和基類子對象所獲取的所有資源,會以初始化順序的逆序釋放。這有效地利用了語言特性以消除內(nèi)存泄漏并保證異常安全。

RAII 可總結(jié)如下:
  • 將每個資源封裝入一個類,其中:
    • 構(gòu)造函數(shù)請求資源,并建立所有類不變式,或在它無法完成時拋出異常,
    • 析構(gòu)函數(shù)釋放資源并決不拋出異常;
  • 始終經(jīng)由 RAII 類的實例使用滿足要求的資源,該資源
    • 自身擁有自動存儲期或臨時生存期,或
    • 具有與自動或臨時對象的生存期綁定的生存期

回想一下上文中的transferMoney方法中的三行代碼:
lock(*accountA->getLock(), *accountB->getLock());lock_guard lockA(*accountA->getLock(), adopt_lock);lock_guard lockB(*accountB->getLock(), adopt_lock);
 
如果使用unique_lock這三行代碼還有一種等價的寫法:
		
unique_lock lockA(*accountA->getLock(), defer_lock);unique_lock lockB(*accountB->getLock(), defer_lock);lock(*accountA->getLock(), *accountB->getLock());

請注意這里lock方法的調(diào)用位置。這里先定義unique_lock指定了defer_lock,因此實際沒有鎖定互斥體,而是到第三行才進行鎖定。
最后,借助scoped_lock,我們可以將三行代碼合成一行,這種寫法也是等價的。
		
scoped_lock lockAll(*accountA->getLock(), *accountB->getLock());

scoped_lock會在其生命周期范圍內(nèi)鎖定互斥體,銷毀的時候解鎖。同時,它可以鎖定多個互斥體,并且避免死鎖。
目前,只還有shared_lock我們沒有提到。它與其他幾個類的區(qū)別在于:它是以共享的方式鎖定互斥體。

條件變量


| API | C++標準 | 說明 |
| condition_variable | C++ 11 | 提供與 std::unique_lock 關(guān)聯(lián)的條件變量 |
| condition_variable_any | C++ 11 |提供與任何鎖類型關(guān)聯(lián)的條件變量 |
| notify_all_at_thread_exit |C++ 11 | 安排到在此線程完全結(jié)束時對 notify_all 的調(diào)用 |
| cv_status | C++ 11 |列出條件變量上定時等待的可能結(jié)果 |

至此,我們還有一個地方可以改進。那就是:轉(zhuǎn)賬金額不足的時候,程序直接返回了false。這很難說是一個好的策略。因為,即便雖然當前賬號金額不足以轉(zhuǎn)賬,但只要別的賬號又轉(zhuǎn)賬進來之后,當前這個轉(zhuǎn)賬操作也許就可以繼續(xù)執(zhí)行了。
這在很多業(yè)務(wù)中是很常見的一個需求:每一次操作都要正確執(zhí)行,如果條件不滿足就停下來等待,直到條件滿足之后再繼續(xù)。而不是直接返回。
條件變量提供了一個可以讓多個線程間同步協(xié)作的功能。這對于生產(chǎn)者-消費者模型很有意義。在這個模型下:
  • 生產(chǎn)者和消費者共享一個工作區(qū)。這個區(qū)間的大小是有限的。
  • 生產(chǎn)者總是產(chǎn)生數(shù)據(jù)放入工作區(qū)中,當工作區(qū)滿了。它就停下來等消費者消費一部分數(shù)據(jù),然后繼續(xù)工作。
  • 消費者總是從工作區(qū)中拿出數(shù)據(jù)使用。當工作區(qū)中的數(shù)據(jù)全部被消費空了之后,它也會停下來等待生產(chǎn)者往工作區(qū)中放入新的數(shù)據(jù)。

從上面可以看到,無論是生產(chǎn)者還是消費者,當它們工作的條件不滿足時,它們并不是直接報錯返回,而是停下來等待,直到條件滿足。
下面我們就借助于條件變量,再次改造之前的銀行轉(zhuǎn)賬系統(tǒng)。
這個改造主要在于賬號類。我們重點是要調(diào)整changeMoney方法。
		
// 11_bank_transfer_wait_notify.cpp class Account {public: Account(string name, double money): mName(name), mMoney(money) {}; public: void changeMoney(double amount) { unique_lock lock(mMoneyLock); // ② mConditionVar.wait(lock, [this, amount] { // ③ return mMoney + amount > 0; // ④ }); mMoney += amount; mConditionVar.notify_all(); // ⑤ }  string getName() { return mName; }  double getMoney() { return mMoney; } private: string mName; double mMoney; mutex mMoneyLock; condition_variable mConditionVar; // ①};

這幾處改動說明如下:
  1. 這里聲明了一個條件變量,用來在多個線程之間協(xié)作。
  2. 這里使用的是unique_lock,這是為了與條件變量相配合。因為條件變量會解鎖和重新鎖定互斥體。
  3. 這里是比較重要的一個地方:通過條件變量進行等待。此時:會通過后面的lambda表達式判斷條件是否滿足。如果滿足則繼續(xù);如果不滿足,則此處會解鎖互斥體,并讓當前線程等待。解鎖這一點非常重要,因為只有這樣,才能讓其他線程獲取互斥體。
  4. 這里是條件變量等待的條件。如果你不熟悉lambda表達式,請自行網(wǎng)上學(xué)習(xí),或者閱讀我之前寫的文章。
  5. 此處也很重要。當金額發(fā)生變動之后,我們需要通知所有在條件變量上等待的其他線程。此時所有調(diào)用wait線程都會再次喚醒,然后嘗試獲取鎖(當然,只有一個能獲取到)并再次判斷條件是否滿足。除了notify_all還有notify_one,它只通知一個等待的線程。wait和notify就構(gòu)成了線程間互相協(xié)作的工具。

請注意:wait和notify_all雖然是寫在一個函數(shù)中的,但是在運行時它們是在多線程環(huán)境中執(zhí)行的,因此對于這段代碼,需要能夠從不同線程的角度去思考代碼的邏輯。這也是開發(fā)并發(fā)系統(tǒng)比較難的地方。
有了上面的改動之后,銀行的轉(zhuǎn)賬方法實現(xiàn)起來就很簡單了,不用再考慮數(shù)據(jù)保護的問題了:
		
// 11_bank_transfer_wait_notify.cpp void Bank::transferMoney(Account* accountA, Account* accountB, double amount) { accountA->changeMoney(-amount); accountB->changeMoney(amount);}

當然,轉(zhuǎn)賬邏輯也會變得簡單,不用再管轉(zhuǎn)賬失敗的情況發(fā)生。
		
// 11_bank_transfer_wait_notify.cpp mutex sCoutLock;void randomTransfer(Bank* bank, Account* accountA, Account* accountB) { while(true) { double randomMoney = ((double)rand() / RAND_MAX) * 100; { lock_guard guard(sCoutLock); cout << "Try to Transfer " << randomMoney << " from " << accountA->getName() << "(" << accountA->getMoney() << ") to " << accountB->getName() << "(" << accountB->getMoney() << "), Bank totalMoney: " << bank->totalMoney() << endl; } bank->transferMoney(accountA, accountB, randomMoney); }}

修改完之后的程序運行輸出如下:
		
...Try to Transfer 13.72 from Moira(10.9287) to Paul(189.071), Bank totalMoney: 200Try to Transfer 28.6579 from Paul(189.071) to Moira(10.9287), Bank totalMoney: 200Try to Transfer 91.8049 from Paul(160.413) to Moira(39.5866), Bank totalMoney: 200Try to Transfer 5.56383 from Paul(82.3285) to Moira(117.672), Bank totalMoney: 200Try to Transfer 11.3594 from Paul(76.7646) to Moira(123.235), Bank totalMoney: 200Try to Transfer 16.9557 from Paul(65.4053) to Moira(134.595), Bank totalMoney: 200Try to Transfer 74.998 from Paul(48.4495) to Moira(151.55), Bank totalMoney: 200Try to Transfer 65.3005 from Moira(151.55) to Paul(48.4495), Bank totalMoney: 200Try to Transfer 90.6084 from Moira(86.25) to Paul(113.75), Bank totalMoney: 125.002Try to Transfer 99.6425 from Moira(70.6395) to Paul(129.36), Bank totalMoney: 200Try to Transfer 55.2091 from Paul(129.36) to Moira(70.6395), Bank totalMoney: 200Try to Transfer 92.259 from Paul(74.1513) to Moira(125.849), Bank totalMoney: 200...
這下比之前都要好了。
但是細心的讀者會發(fā)現(xiàn),Bank totalMoney的輸出有時候是200,有時候不是。但不管怎樣,即便這一次不是,下一次又是了。關(guān)于這一點,請讀者自行思考一下為什么,以及如何改進。

future


這一小節(jié)中,我們來熟悉更多的可以在并發(fā)環(huán)境中使用的工具,它們都位于頭文件中。

async


很多語言都提供了異步的機制。異步使得耗時的操作不影響當前主線程的執(zhí)行流。
在C++11中,async便是完成這樣的功能的。下面是一個代碼示例:
		
// 12_async_task.cpp static const int MAX = 10e8;static double sum = 0; void worker(int min, int max) { for (int i = min; i <= max; i++) { sum += sqrt(i); }} int main() { sum = 0; auto f1 = async(worker, 0, MAX); cout << "Async task triggered" << endl; f1.wait(); cout << "Async task finish, result: " << sum << endl << endl;}

這仍然是我們之前熟悉的例子。這里有兩個地方需要說明:
  1. 這里以異步的方式啟動了任務(wù)。它會返回一個future對象。future用來存儲異步任務(wù)的執(zhí)行結(jié)果,關(guān)于future我們在后面packaged_task的例子中再詳細說明。在這個例子中我們僅僅用它來等待任務(wù)執(zhí)行完成。
  2. 此處是等待異步任務(wù)執(zhí)行完成。

需要注意的是,默認情況下,async是啟動一個新的線程,還是以同步的方式(不啟動新的線程)運行任務(wù),這一點標準是沒有指定的,由具體的編譯器決定。如果希望一定要以新的線程來異步執(zhí)行任務(wù),可以通過launch::async來明確說明。launch中有兩個常量:
  • async:運行新線程,以異步執(zhí)行任務(wù)。
  • deferred:調(diào)用方線程上第一次請求其結(jié)果時才執(zhí)行任務(wù),即惰性求值。

除了通過函數(shù)來指定異步任務(wù),還可以lambda表達式的方式來指定。如下所示:
 
		
// 12_async_task.cpp int main() {  double result = 0; cout << "Async task with lambda triggered, thread: " << this_thread::get_id() << endl; auto f2 = async(launch::async, [&result]() { cout << "Lambda task in thread: " << this_thread::get_id() << endl; for (int i = 0; i <= MAX; i++) { result += sqrt(i); } }); f2.wait(); cout << "Async task with lambda finish, result: " << result << endl << endl;  return 0;} 

在上面這段代碼中,我們使用一個lambda表達式來編寫異步任務(wù)的邏輯,并通過launch::async明確指定要通過獨立的線程來執(zhí)行任務(wù),同時我們打印出了線程的id。
這段代碼輸出如下:
		
Async task with lambda triggered, thread: 0x11290d5c0Lambda task in thread: 0x700007aa1000Async task with lambda finish, result: 2.10819e+13


對于面向?qū)ο缶幊虂碚f,很多時候肯定希望以對象的方法來指定異步任務(wù)。下面是一個示例:
		
// 12_async_task.cpp class Worker {public: Worker(int min, int max): mMin(min), mMax(max) {} // ① double work() { // ② mResult = 0; for (int i = mMin; i <= mMax; i++) { mResult += sqrt(i); } return mResult; } double getResult() { return mResult; } private: int mMin; int mMax; double mResult;}; int main() { Worker w(0, MAX); cout << "Task in class triggered" << endl; auto f3 = async(&Worker::work, &w); // ③ f3.wait(); cout << "Task in class finish, result: " << w.getResult() << endl << endl;  return 0;}

這段代碼有三處需要說明:
  1. 這里通過一個類來描述任務(wù)。這個類是對前面提到的任務(wù)的封裝。它包含了任務(wù)的輸入?yún)?shù),和輸出結(jié)果。
  2. work函數(shù)是任務(wù)的主體邏輯。
  3. 通過async執(zhí)行任務(wù):這里指定了具體的任務(wù)函數(shù)以及相應(yīng)的對象。請注意這里是&w,因此傳遞的是對象的指針。如果不寫&將傳入w對象的臨時復(fù)制。


packaged_task


在一些業(yè)務(wù)中,我們可能會有很多的任務(wù)需要調(diào)度。這時我們常常會設(shè)計出任務(wù)隊列和線程池的結(jié)構(gòu)。此時,就可以使用packaged_task來包裝任務(wù)。
如果你了解設(shè)計模式,你應(yīng)該會知道命令模式。

packaged_task綁定到一個函數(shù)或者可調(diào)用對象上。當它被調(diào)用時,它就會調(diào)用其綁定的函數(shù)或者可調(diào)用對象。并且,可以通過與之相關(guān)聯(lián)的future來獲取任務(wù)的結(jié)果。調(diào)度程序只需要處理packaged_task,而非各個函數(shù)。
packaged_task對象是一個可調(diào)用對象,它可以被封裝成一個std::fucntion,或者作為線程函數(shù)傳遞給std::thread,或者直接調(diào)用。
下面是一個代碼示例:
		
// 13_packaged_task.cpp double concurrent_worker(int min, int max) { double sum = 0; for (int i = min; i <= max; i++) { sum += sqrt(i); } return sum;} double concurrent_task(int min, int max) { vectordouble>> results; // ①  unsigned concurrent_count = thread::hardware_concurrency(); min = 0; for (int i = 0; i < concurrent_count; i++) { // ② packaged_task<double(int, int)> task(concurrent_worker); // ③ results.push_back(task.get_future()); // ④  int range = max / concurrent_count * (i + 1); thread t(std::move(task), min, range); // ⑤ t.detach();  min = range + 1; }  cout << "threads create finish" << endl; double sum = 0; for (auto& r : results) { sum += r.get(); ⑥ } return sum;} int main() { auto start_time = chrono::steady_clock::now();  double r = concurrent_task(0, MAX);  auto end_time = chrono::steady_clock::now(); auto ms = chrono::duration_cast(end_time - start_time).count(); cout << "Concurrent task finish, " << ms << " ms consumed, Result: " << r << endl; return 0;}

在這段代碼中:
  1. 首先創(chuàng)建一個集合來存儲future對象。我們將用它來獲取任務(wù)的結(jié)果。
  2. 同樣的,根據(jù)CPU的情況來創(chuàng)建線程的數(shù)量。
  3. 將任務(wù)包裝成packaged_task。請注意,由于concurrent_worker被包裝成了任務(wù),我們無法直接獲取它的return值。而是要通過future對象來獲取。
  4. 獲取任務(wù)關(guān)聯(lián)的future對象,并將其存入集合中。
  5. 通過一個新的線程來執(zhí)行任務(wù),并傳入需要的參數(shù)。
  6. 通過future集合,逐個獲取每個任務(wù)的計算結(jié)果,將其累加。這里r.get()獲取到的就是每個任務(wù)中concurrent_worker的返回值。

為了簡單起見,這里的示例只使用了我們熟悉的例子和結(jié)構(gòu)。但在實際上的工程中,調(diào)用關(guān)系通常更復(fù)雜,你可以借助于packaged_task將任務(wù)組裝成隊列,然后通過線程池的方式進行調(diào)度:


promise與future


在上面的例子中,concurrent_task的結(jié)果是通過return返回的。但在一些時候,我們可能不能這么做:在得到任務(wù)結(jié)果之后,可能還有一些事情需要繼續(xù)處理,例如清理工作。
這個時候,就可以將promise與future配對使用。這樣就可以將返回結(jié)果和任務(wù)結(jié)束兩個事情分開。
下面是對上面代碼示例的改寫:
		
// 14_promise_future.cpp double concurrent_worker(int min, int max) { double sum = 0; for (int i = min; i <= max; i++) { sum += sqrt(i); } return sum;} void concurrent_task(int min, int max, promise<double>* result) { // ① vectordouble>> results;  unsigned concurrent_count = thread::hardware_concurrency(); min = 0; for (int i = 0; i < concurrent_count; i++) { packaged_task<double(int, int)> task(concurrent_worker); results.push_back(task.get_future());   int range = max / concurrent_count * (i + 1); thread t(std::move(task), min, range); t.detach();  min = range + 1; }  cout << "threads create finish" << endl; double sum = 0; for (auto& r : results) { sum += r.get(); } result->set_value(sum); // ② cout << "concurrent_task finish" << endl;} int main() { auto start_time = chrono::steady_clock::now();  promise<double> sum; // ③ concurrent_task(0, MAX, &sum);  auto end_time = chrono::steady_clock::now(); auto ms = chrono::duration_cast(end_time - start_time).count(); cout << "Concurrent task finish, " << ms << " ms consumed." << endl; cout << "Result: " << sum.get_future().get() << endl; // ④ return 0;}

這段代碼和上面的示例在很大程度上是一樣的。只有小部分內(nèi)容做了改動:
  1. concurrent_task不再直接返回計算結(jié)果,而是增加了一個promise對象來存放結(jié)果。
  2. 在任務(wù)計算完成之后,將總結(jié)過設(shè)置到promise對象上。一旦這里調(diào)用了set_value,其相關(guān)聯(lián)的future對象就會就緒。
  3. 這里是在main中創(chuàng)建一個promoise來存放結(jié)果,并以指針的形式傳遞進concurrent_task中。
  4. 通過sum.get_future().get()來獲取結(jié)果。第2點中已經(jīng)說了:一旦調(diào)用了set_value,其相關(guān)聯(lián)的future對象就會就緒。

需要注意的是,future對象只有被一個線程獲取值。并且在調(diào)用get()之后,就沒有可以獲取的值了。如果從多個線程調(diào)用get()會出現(xiàn)數(shù)據(jù)競爭,其結(jié)果是未定義的。
如果真的需要在多個線程中獲取future的結(jié)果,可以使用shared_future。
并行算法
從C++17開始。和頭文件的中的很多算法都添加了一個新的參數(shù):sequenced_policy。
借助這個參數(shù),開發(fā)者可以直接使用這些算法的并行版本,不用再自己創(chuàng)建并發(fā)系統(tǒng)和劃分數(shù)據(jù)來調(diào)度這些算法。
sequenced_policy可能的取值有三種,它們的說明如下:


注意:本文的前面已經(jīng)提到,目前clang編譯器還不支持這個功能。因此想要編譯這部分代碼,你需要使用gcc 9.0或更高版本,同時還需要安裝Intel Threading Building Blocks。
下面還是通過一個示例來進行說明:
// 15_parallel_algorithm.cpp  void generateRandomData(vector<double>& collection, int size) { random_device rd; mt19937 mt(rd()); uniform_real_distribution<double> dist(1.0, 100.0); for (int i = 0; i < size; i++) { collection.push_back(dist(mt)); }}    int main() { vector<double> collection; generateRandomData(collection, 10e6); // ①   vector<double> copy1(collection); // ② vector<double> copy2(collection); vector<double> copy3(collection);  auto time1 = chrono::steady_clock::now(); // ③ sort(execution::seq, copy1.begin(), copy1.end()); // ④ auto time2 = chrono::steady_clock::now(); auto duration = chrono::duration_cast(time2 - time1).count(); cout << "Sequenced sort consuming " << duration << "ms." << endl; // ⑤   auto time3 = chrono::steady_clock::now(); sort(execution::par, copy2.begin(),copy2.end()); // ⑥ auto time4 = chrono::steady_clock::now(); duration = chrono::duration_cast(time4 - time3).count(); cout << "Parallel sort consuming " << duration << "ms." << endl;   auto time5 = chrono::steady_clock::now(); sort(execution::par_unseq, copy2.begin(),copy2.end()); // ⑦ auto time6 = chrono::steady_clock::now(); duration = chrono::duration_cast(time6 - time5).count(); cout << "Parallel unsequenced sort consuming " << duration << "ms." << endl;}

這段代碼很簡單:
  1. 通過一個函數(shù)生成1000,000個隨機數(shù)。

  2. 將數(shù)據(jù)拷貝3份,以備使用。

  3. 接下來將通過三個不同的parallel_policy參數(shù)來調(diào)用同樣的sort算法。每次調(diào)用記錄開始和結(jié)束的時間。

  4. 第一次調(diào)用使用std::execution::seq參數(shù)。

  5. 輸出本次測試所使用的時間。

  6. 第二次調(diào)用使用std::execution::par參數(shù)。

  7. 第三次調(diào)用使用std::execution::par_unseq參數(shù)。


該程序的輸出如下:
Sequenced sort consuming 4464ms.Parallel sort consuming 459ms.Parallel unsequenced sort consuming 168ms.

可以看到,性能最好的和最差的相差了超過26倍。
結(jié)束語
在本篇文章中,我們介紹了C++語言中新增的并發(fā)編程API。雖然這部分內(nèi)容已經(jīng)不少(大部分人很難一次性搞懂所有這些內(nèi)容,包括我自己),但實際上還有一個很重要的話題我們沒有觸及,那就是“內(nèi)存模型”。
C++內(nèi)存模型是C++11標準中最重要的特性之一。它是多線程環(huán)境能夠可靠工作的基礎(chǔ)??紤]到這部分內(nèi)容還需要比較多的篇幅來說明,因此我們會在下一篇文章中繼續(xù)討論。
本站聲明: 本文章由作者或相關(guān)機構(gòu)授權(quán)發(fā)布,目的在于傳遞更多信息,并不代表本站贊同其觀點,本站亦不保證或承諾內(nèi)容真實性等。需要轉(zhuǎn)載請聯(lián)系該專欄作者,如若文章內(nèi)容侵犯您的權(quán)益,請及時聯(lián)系本站刪除。
關(guān)閉