LevelDB源碼分析之九:env
考慮到移植以及靈活性,LevelDB將系統(tǒng)相關(guān)的處理(文件/進(jìn)程/時(shí)間)抽象成Evn,用戶可以自己實(shí)現(xiàn)相應(yīng)的接口,作為option的一部分傳入,默認(rèn)使用自帶的實(shí)現(xiàn)。?
env.h中聲明了:
虛基類env,在env_posix.cc中,派生類PosixEnv繼承自env類,是LevelDB的默認(rèn)實(shí)現(xiàn)。虛基類WritableFile、SequentialFile、RandomAccessFile,分別是文件的寫抽象類,順序讀抽象類和隨機(jī)讀抽象類類Logger,log文件的寫入接口,log文件是防止系統(tǒng)異常終止造成數(shù)據(jù)丟失,是memtable在磁盤的備份類FileLock,為文件上鎖WriteStringToFile、ReadFileToString、Log三個(gè)全局函數(shù),封裝了上述接口
下面來(lái)看看env_posix.cc中為我們寫好的默認(rèn)實(shí)現(xiàn)
順序讀:
class?PosixSequentialFile:?public?SequentialFile?{
?private:
??std::string?filename_;
??FILE*?file_;
?public:
??PosixSequentialFile(const?std::string&?fname,?FILE*?f)
????:?filename_(fname),?file_(f)?{?}
??virtual?~PosixSequentialFile()?{?fclose(file_);?}
??//?從文件中讀取n個(gè)字節(jié)存放到?"scratch[0..n-1]",?然后將"scratch[0..n-1]"轉(zhuǎn)化為Slice類型并存放到*result中
??//?如果正確讀取,則返回OK?status,否則返回non-OK?status
??virtual?Status?Read(size_t?n,?Slice*?result,?char*?scratch)?{
??Status?s;
#ifdef?BSD
??//?fread_unlocked?doesn't?exist?on?FreeBSD
??size_t?r?=?fread(scratch,?1,?n,?file_);
#else
??//?size_t?fread_unlocked(void?*ptr,?size_t?size,?size_t?n,FILE?*stream);
??//?ptr:用于接收數(shù)據(jù)的內(nèi)存地址
??//?size:要讀的每個(gè)數(shù)據(jù)項(xiàng)的字節(jié)數(shù),單位是字節(jié)
??//?n:要讀n個(gè)數(shù)據(jù)項(xiàng),每個(gè)數(shù)據(jù)項(xiàng)size個(gè)字節(jié)
??//?stream:輸入流
??//?返回值:返回實(shí)際讀取的數(shù)據(jù)大小
??//?因?yàn)楹瘮?shù)名帶了"_unlocked"后綴,所以它不是線程安全的
??size_t?r?=?fread_unlocked(scratch,?1,?n,?file_);
#endif
??//?Slice的第二個(gè)參數(shù)要用實(shí)際讀到的數(shù)據(jù)大小,因?yàn)樽x到文件尾部,剩下的字節(jié)數(shù)可能小于n
??*result?=?Slice(scratch,?r);
??if?(r?<?n)?{
????if?(feof(file_))?{
????//?We?leave?status?as?ok?if?we?hit?the?end?of?the?file
????//?如果r<n,且feof(file_)非零,說(shuō)明到了文件結(jié)尾,什么都不用做,函數(shù)結(jié)束后會(huì)返回OK?Status
????}?else?{
????//?A?partial?read?with?an?error:?return?a?non-ok?status
????//?否則返回錯(cuò)誤信息
????s?=?Status::IOError(filename_,?strerror(errno));
????}
??}
??return?s;
??}
??//?跳過(guò)n字節(jié)的內(nèi)容,這并不比讀取n字節(jié)的內(nèi)容慢,而且會(huì)更快。
??//?如果到達(dá)了文件尾部,則會(huì)停留在文件尾部,并返回OK?Status。
??//?否則,返回錯(cuò)誤信息
??virtual?Status?Skip(uint64_t?n)?{
???//?int?fseek(FILE?*stream,?long?offset,?int?origin);
???//?stream:文件指針
???//?offset:偏移量,整數(shù)表示正向偏移,負(fù)數(shù)表示負(fù)向偏移
???//?origin:設(shè)定從文件的哪里開始偏移,?可能取值為:SEEK_CUR、?SEEK_END?或?SEEK_SET
???//?SEEK_SET:?文件開頭
???//?SEEK_CUR:?當(dāng)前位置
???//?SEEK_END:?文件結(jié)尾
???//?其中SEEK_SET,?SEEK_CUR和SEEK_END和依次為0,1和2.
???//?舉例:
???//?fseek(fp,?100L,?0);?把fp指針移動(dòng)到離文件開頭100字節(jié)處;
???//?fseek(fp,?100L,?1);?把fp指針移動(dòng)到離文件當(dāng)前位置100字節(jié)處;
???//?fseek(fp,?100L,?2);?把fp指針退回到離文件結(jié)尾100字節(jié)處。
???//?返回值:成功返回0,失敗返回非0
??if?(fseek(file_,?n,?SEEK_CUR))?{
????return?Status::IOError(filename_,?strerror(errno));
??}
??return?Status::OK();
??}
};這就是LevelDB從磁盤順序讀取文件的接口了,用的是C的流文件操作和FILE結(jié)構(gòu)體。需要注意的是Read接口讀取文件時(shí)不會(huì)鎖住文件流,因此外部的并發(fā)訪問(wèn)需要自行提供并發(fā)控制。
隨機(jī)讀:
class?PosixRandomAccessFile:?public?RandomAccessFile?{
?private:
??std::string?filename_;
??int?fd_;
??mutable?boost::mutex?mu_;
?public:
??PosixRandomAccessFile(const?std::string&?fname,?int?fd)
????:?filename_(fname),?fd_(fd)?{?}
??virtual?~PosixRandomAccessFile()?{?close(fd_);?}
??//?這里與順序讀的同名函數(shù)相比,多了一個(gè)參數(shù)offset,offset用來(lái)指定
??//?讀取位置距離文件起始位置的偏移量,這樣就可以實(shí)現(xiàn)隨機(jī)讀了。
??virtual?Status?Read(uint64_t?offset,?size_t?n,?Slice*?result,
????????????char*?scratch)?const?{
????Status?s;
#ifdef?WIN32
????//?no?pread?on?Windows?so?we?emulate?it?with?a?mutex
????boost::unique_locklock(mu_);
????if?(::_lseeki64(fd_,?offset,?SEEK_SET)?==?-1L)?{
??????return?Status::IOError(filename_,?strerror(errno));
????}
//?int?_read(int?_FileHandle,?void?*?_DstBuf,?unsigned?int?_MaxCharCount)
//?_FileHandle:文件描述符
//?_DstBuf:保存讀取數(shù)據(jù)的緩沖區(qū)
//?_MaxCharCount:讀取的字節(jié)數(shù)
//?返回值:成功返回讀取的字節(jié)數(shù),出錯(cuò)返回-1并設(shè)置errno。
????int?r?=?::_read(fd_,?scratch,?n);
????*result?=?Slice(scratch,?(r?<?0)???0?:?r);
????lock.unlock();
#else
//?在非windows系統(tǒng)上使用pread進(jìn)行隨機(jī)讀,為何此時(shí)不用鎖呢?詳見下文分析
????ssize_t?r?=?pread(fd_,?scratch,?n,?static_cast(offset));
????*result?=?Slice(scratch,?(r?<?0)???0?:?r);
#endif
????if?(r?<?0)?{
??????//?An?error:?return?a?non-ok?status
??????s?=?Status::IOError(filename_,?strerror(errno));
????}
????return?s;
??}
};可以看到的是,PosixRandomAccessFile 在非windows系統(tǒng)上使用了 pread 來(lái)實(shí)現(xiàn)原子的定位加訪問(wèn)功能。常規(guī)的隨機(jī)訪問(wèn)文件的過(guò)程可以分為兩步,fseek (seek) 定位到訪問(wèn)點(diǎn),調(diào)用 fread (read) 來(lái)從特定位置開始訪問(wèn) FILE* (fd)。然而,這兩個(gè)操作組合在一起并不是原子的,即 fseek 和 fread 之間可能會(huì)插入其他線程的文件操作。相比之下 pread 由系統(tǒng)來(lái)保證實(shí)現(xiàn)原子的定位和讀取組合功能。需要注意的是,pread 操作不會(huì)更新文件指針。
需要注意的是,在隨機(jī)讀和順序讀中,分別用fd和FILE *來(lái)表示一個(gè)文件。文件描述符(file descriptor)是系統(tǒng)層的概念, fd 對(duì)應(yīng)于系統(tǒng)打開文件表里面的一個(gè)文件;FILE* 是應(yīng)用層的概念,其包含了應(yīng)用層操作文件的數(shù)據(jù)結(jié)構(gòu)。
順序?qū)懀?/p>
class?BoostFile?:?public?WritableFile?{
public:
??explicit?BoostFile(std::string?path)?:?path_(path),?written_(0)?{
????Open();
??}
??virtual?~BoostFile()?{
????Close();
??}
private:
??void?Open()?{
????//?we?truncate?the?file?as?implemented?in?env_posix
?//?trunc:先將文件中原有的內(nèi)容清空
?//?out:為輸出(寫)而打開文件
?//?binary:以二進(jìn)制方式打開文件
?????file_.open(path_.generic_string().c_str(),?
?????????std::ios_base::trunc?|?std::ios_base::out?|?std::ios_base::binary);
?????written_?=?0;
??}
public:
??virtual?Status?Append(const?Slice&?data)?{
????Status?result;
????file_.write(data.data(),?data.size());
????if?(!file_.good())?{
??????result?=?Status::IOError(
??????????path_.generic_string()?+?"?Append",?"cannot?write");
????}
????return?result;
??}
??virtual?Status?Close()?{
????Status?result;
????try?{
??????if?(file_.is_open())?{
????????Sync();
//?關(guān)閉流時(shí),緩沖區(qū)中的數(shù)據(jù)會(huì)自動(dòng)寫入到文件
//?上面調(diào)用Sync()強(qiáng)制刷新,是為了確保數(shù)據(jù)寫入,防止數(shù)據(jù)丟失
????????file_.close();
??????}
????}?catch?(const?std::exception?&?e)?{
??????result?=?Status::IOError(path_.generic_string()?+?"?close",?e.what());
????}
????return?result;
??}
??virtual?Status?Flush()?{
????file_.flush();
????return?Status::OK();
??}
??//?手動(dòng)刷新(清空輸出緩沖區(qū),并把緩沖區(qū)內(nèi)容同步到文件)
??virtual?Status?Sync()?{
????Status?result;
????try?{
??????Flush();
????}?catch?(const?std::exception?&?e)?{
??????result?=?Status::IOError(path_.string()?+?"?sync",?e.what());
????}
????return?result;
??}
private:
??boost::filesystem::path?path_;
??boost::uint64_t?written_;
??std::ofstream?file_;
};關(guān)于ofstream::flush和ofstream::Close的區(qū)別,詳見:C++之ofstream::flush與ofstream::close
文件鎖:
class?BoostFileLock?:?public?FileLock?{
?public:
??boost::interprocess::file_lock?fl_;
};virtual?Status?LockFile(const?std::string&?fname,?FileLock**?lock)?{
????*lock?=?NULL;
????Status?result;
????try?{
??????if?(!boost::filesystem::exists(fname))?{
????????std::ofstream?of(fname,?std::ios_base::trunc?|?std::ios_base::out);
??????}
??????assert(boost::filesystem::exists(fname));
??????boost::interprocess::file_lock?fl(fname.c_str());
??????BoostFileLock?*?my_lock?=?new?BoostFileLock();
??????my_lock->fl_?=?std::move(fl);
??????if?(my_lock->fl_.try_lock())
????????*lock?=?my_lock;
??????else
????????result?=?Status::IOError("acquiring?lock?"?+?fname?+?"?failed");
????}?catch?(const?std::exception?&?e)?{
??????result?=?Status::IOError("lock?"?+?fname,?e.what());
????}
????return?result;
??}?virtual?Status?UnlockFile(FileLock*?lock)?{
????Status?result;
????try?{
??????BoostFileLock?*?my_lock?=?static_cast(lock);
??????my_lock->fl_.unlock();
??????delete?my_lock;
????}?catch?(const?std::exception?&?e)?{
??????result?=?Status::IOError("unlock",?e.what());
????}
????return?result;
??}文件的鎖操作是調(diào)用Boost的鎖實(shí)現(xiàn)的。加鎖是為了防止多進(jìn)程的并發(fā)沖突,如果加鎖失敗,*lock=NULL,且返回non-OK;如果加鎖成功,*lock存放的的是鎖的指針,并返回OK。如果進(jìn)程退出,鎖會(huì)自動(dòng)釋放,否則用戶需要調(diào)用UnlockFile顯式的釋放鎖。
這幾個(gè)方法都非常簡(jiǎn)單,比較晦澀的是這句:my_lock->std::move(f1),從函數(shù)名來(lái)看,是要移動(dòng)f1。其實(shí)std::move是C++11標(biāo)準(zhǔn)庫(kù)在
計(jì)劃任務(wù):
PosixEnv還有一個(gè)很重要的功能,計(jì)劃任務(wù),也就是后臺(tái)的compaction線程。compaction就是壓縮合并的意思,在LevelDB源碼分析之六:skiplist(2)中也有提到。對(duì)于LevelDB來(lái)說(shuō),寫入記錄操作很簡(jiǎn)單,刪除記錄僅僅寫入一個(gè)刪除標(biāo)記就算完事,但是讀取記錄比較復(fù)雜,需要在內(nèi)存以及各個(gè)層級(jí)文件中依照新鮮程度依次查找,代價(jià)很高。為了加快讀取速度,LevelDB采取了compaction的方式來(lái)對(duì)已有的記錄進(jìn)行整理壓縮,通過(guò)這種方式,來(lái)刪除掉一些不再有效的KV數(shù)據(jù),減小數(shù)據(jù)規(guī)模,減少文件數(shù)量等。
PosixEnv中定義了一個(gè)任務(wù)隊(duì)列:
??struct?BGItem?{?void*?arg;?void?(*function)(void*);?};
??//用的是deque雙端隊(duì)列作為底層的數(shù)據(jù)結(jié)構(gòu)
??typedef?std::dequeBGQueue;
??BGQueue?queue_;主線程一旦判定需要進(jìn)行compaction操作,就把compaction任務(wù)壓進(jìn)隊(duì)列queue_中,BGItem是存有任務(wù)函數(shù)和db對(duì)象指針的結(jié)構(gòu)。而后臺(tái)線程從一開始就不斷根據(jù)隊(duì)列中的函數(shù)指針執(zhí)行compaction任務(wù)。BGThread()函數(shù)就是不停的在queue_中取出函數(shù)指針,執(zhí)行。
后臺(tái)進(jìn)程一直執(zhí)行queue_中的任務(wù),由于queue_是動(dòng)態(tài)的,自然需要考慮queue_空了怎么辦,LevelDB采用的是條件變量boost::condition_variable bgsignal_,隊(duì)列空了就進(jìn)入等待,直至有新的任務(wù)加入進(jìn)來(lái)。而條件變量一般是要和boost::mutex mu_搭配使用,防止某些邏輯錯(cuò)誤。
?//?BGThread函數(shù)的包裝,里面調(diào)用的就是BGThread函數(shù)
??static?void*?BGThreadWrapper(void*?arg)?{
????reinterpret_cast(arg)->BGThread();
????return?NULL;
??}void?PosixEnv::Schedule(void?(*function)(void*),?void*?arg)?{
??boost::unique_locklock(mu_);
??//?Start?background?thread?if?necessary
??if?(!bgthread_)?{
?????bgthread_.reset(
?????????new?boost::thread(boost::bind(&PosixEnv::BGThreadWrapper,?this)));
??}
??//?Add?to?priority?queue
??//?將任務(wù)壓進(jìn)隊(duì)列中
??queue_.push_back(BGItem());
??queue_.back().function?=?function;
??queue_.back().arg?=?arg;
??lock.unlock();
??bgsignal_.notify_one();
}void?PosixEnv::BGThread()?{
??while?(true)?{
??//?加鎖,防止并發(fā)沖突
??boost::unique_locklock(mu_);
??//?如果隊(duì)列為空,等待,直到收到通知(notification)
??while?(queue_.empty())?{
????bgsignal_.wait(lock);
??}
??//?從隊(duì)列頭取出任務(wù)的函數(shù)及其參數(shù)
??void?(*function)(void*)?=?queue_.front().function;
??void*?arg?=?queue_.front().arg;
??queue_.pop_front();
??lock.unlock();
??//?調(diào)用函數(shù)
??(*function)(arg);
??}
}此外PosixEnv中還有FileExists、GetChildren、DeleteFile、CreateDir、DeleteDir、GetFileSize、RenameFile等等函數(shù),他們見名知義,都是調(diào)用Boot的相應(yīng)函數(shù)實(shí)現(xiàn)的。
EnvWrapper:
在levelDB中還實(shí)現(xiàn)了一個(gè)EnvWrapper類,該類繼承自Env,且只有一個(gè)成員函數(shù)Env* target_,該類的所有變量都調(diào)用Env類相應(yīng)的成員變量,我們知道,Env是一個(gè)抽象類,是不能定義Env 類型的對(duì)象的。我們傳給EnvWrapper 的構(gòu)造函數(shù)的類型是PosixEnv,所以,最后調(diào)用的都是PosixEnv類的成員變量,你可能已經(jīng)猜到了,這就是設(shè)計(jì)模式中的代理模式,EnvWrapper只是進(jìn)行了簡(jiǎn)單的封裝,它的代理了Env的子類PosixEnv。
EnvWrapper和Env與PosixEnv的關(guān)系如下:
由于篇幅限制,Env中的Logger類就放在后面分析了,參考:LevelDB源碼分析之十:LOG文件,從env給我的收獲就是:
利用虛基類的特性提供了默認(rèn)的實(shí)現(xiàn),也開放了用戶自定義操作的權(quán)限面向?qū)ο缶幊谭妒降膶W(xué)習(xí),把一切操作定義成類文件的加鎖解鎖,線程的同步C的文件流操作,對(duì)文件名的字符提取操作,創(chuàng)建、刪除文件和路徑,這些都可以直接用到將來(lái)自己的項(xiàng)目中參考.





