面試官:說(shuō)說(shuō)Kafka處理請(qǐng)求的全流程
掃描二維碼
隨時(shí)隨地手機(jī)看文章
大家好,我是 yes。
這是我的第三篇Kafka源碼分析文章,前兩篇講了日志段的讀寫(xiě)和二分算法在kakfa索引上的應(yīng)用
今天來(lái)講講 Kafka Broker端處理請(qǐng)求的全流程,剖析下底層的網(wǎng)絡(luò)通信是如何實(shí)現(xiàn)的、Reactor在kafka上的應(yīng)用。
再說(shuō)說(shuō)社區(qū)為何在2.3版本將請(qǐng)求類(lèi)型劃分成兩大類(lèi),又是如何實(shí)現(xiàn)兩類(lèi)請(qǐng)求處理的優(yōu)先級(jí)。
叨叨
不過(guò)在進(jìn)入今天主題之前我想先叨叨幾句,就源碼這個(gè)事兒,不同人有不同的看法。
有些人聽(tīng)到源碼這兩個(gè)詞就被嚇到了,這么多代碼怎么看。奔進(jìn)去就像無(wú)頭蒼蠅,一路斷點(diǎn)跟下來(lái),跳來(lái)跳去,算了拜拜了您嘞。
而有些人覺(jué)得源碼有啥用,看了和沒(méi)看一樣,看了也用不上。
其實(shí)上面兩種想法我都有過(guò),哈哈哈。那為什么我會(huì)開(kāi)始看Kafka源碼呢?
其實(shí)就是我有個(gè)同事在自學(xué)go,然后想用go寫(xiě)個(gè)消息隊(duì)列,在畫(huà)架構(gòu)圖的時(shí)候就來(lái)問(wèn)我,這消息隊(duì)列好像有點(diǎn)東西啊,消息收發(fā),元數(shù)據(jù)管理,消息如何持久一堆問(wèn)題過(guò)來(lái),我直呼頂不住。
這市面上Kafka、RocketMQ都是現(xiàn)成的方案,于是乎我就看起了源碼。
所以促使我看源碼的初始動(dòng)力,竟然是為了在同事前面裝逼!!
我是先看了RocketMQ,因?yàn)楫吘故?code style="font-size: 14px;overflow-wrap: break-word;padding: 2px 4px;border-radius: 4px;margin-right: 2px;margin-left: 2px;color: rgb(30, 107, 184);background-color: rgba(27, 31, 35, 0.05);font-family: "Operator Mono", Consolas, Monaco, Menlo, monospace;word-break: break-all;">Java寫(xiě)的,而Kafka Broker都是scala寫(xiě)的。
梳理了一波RocketMQ之后,我又想看看Kafka是怎么做的,于是乎我又看起了Kafka。
在源碼分析之前我先總結(jié)性的說(shuō)了說(shuō)Kafka底層的通信模型。應(yīng)對(duì)面試官詢(xún)問(wèn)Kafka請(qǐng)求全過(guò)程已經(jīng)夠了。
其實(shí)源碼分析在手機(jī)上看效果欠佳,建議電腦端打開(kāi)觀看。
Reactor模式
在扯到Kafka之前我們先來(lái)說(shuō)說(shuō)Reactor模式,基本上只要是底層的高性能網(wǎng)絡(luò)通信就離不開(kāi)Reactor模式。像Netty、Redis都是使用Reactor模式。
像我們以前剛學(xué)網(wǎng)絡(luò)編程的時(shí)候以下代碼可是非常的熟悉,新來(lái)一個(gè)請(qǐng)求,要么在當(dāng)前線(xiàn)程直接處理了,要么新起一個(gè)線(xiàn)程處理。
在早期這樣的編程是沒(méi)問(wèn)題的,但是隨著互聯(lián)網(wǎng)的快速發(fā)展,單線(xiàn)程處理不過(guò)來(lái),也不能充分的利用計(jì)算機(jī)資源。
而每個(gè)請(qǐng)求都新起一個(gè)線(xiàn)程去處理,資源的要求就太高了,并且創(chuàng)建線(xiàn)程也是一個(gè)重操作。
說(shuō)到這有人想到了,那搞個(gè)線(xiàn)程池不就完事了嘛,還要啥Reactor。
池化技術(shù)確實(shí)能緩解資源的問(wèn)題,但是池子是有限的,池子里的一個(gè)線(xiàn)程不還是得候著某個(gè)連接,等待指示嘛?,F(xiàn)在的互聯(lián)網(wǎng)時(shí)代早已突破C10K了。
因此引入的IO多路復(fù)用,由一個(gè)線(xiàn)程來(lái)監(jiān)視一堆連接,同步等待一個(gè)或多個(gè)IO事件的到來(lái),然后將事件分發(fā)給對(duì)應(yīng)的Handler處理,這就叫Reactor模式。
網(wǎng)絡(luò)通信模型的發(fā)展如下
單線(xiàn)程 => 多線(xiàn)程 => 線(xiàn)程池 => Reactor模型
Kafka所采用的Reactor模型如下
Kafka Broker 網(wǎng)絡(luò)通信模型
簡(jiǎn)單來(lái)說(shuō)就是,Broker 中有個(gè)Acceptor(mainReactor)監(jiān)聽(tīng)新連接的到來(lái),與新連接建連之后輪詢(xún)選擇一個(gè)Processor(subReactor)管理這個(gè)連接。
而Processor會(huì)監(jiān)聽(tīng)其管理的連接,當(dāng)事件到達(dá)之后,讀取封裝成Request,并將Request放入共享請(qǐng)求隊(duì)列中。
然后IO線(xiàn)程池不斷的從該隊(duì)列中取出請(qǐng)求,執(zhí)行真正的處理。處理完之后將響應(yīng)發(fā)送到對(duì)應(yīng)的Processor的響應(yīng)隊(duì)列中,然后由Processor將Response返還給客戶(hù)端。
每個(gè)listener只有一個(gè)Acceptor線(xiàn)程,因?yàn)樗皇亲鳛樾逻B接建連再分發(fā),沒(méi)有過(guò)多的邏輯,很輕量,一個(gè)足矣。
Processor 在Kafka中稱(chēng)之為網(wǎng)絡(luò)線(xiàn)程,默認(rèn)網(wǎng)絡(luò)線(xiàn)程池有3個(gè)線(xiàn)程,對(duì)應(yīng)的參數(shù)是num.network.threads。并且可以根據(jù)實(shí)際的業(yè)務(wù)動(dòng)態(tài)增減。
還有個(gè) IO 線(xiàn)程池,即KafkaRequestHandlerPool,執(zhí)行真正的處理,對(duì)應(yīng)的參數(shù)是num.io.threads,默認(rèn)值是 8。IO線(xiàn)程處理完之后會(huì)將Response放入對(duì)應(yīng)的Processor中,由Processor將響應(yīng)返還給客戶(hù)端。
可以看到網(wǎng)絡(luò)線(xiàn)程和IO線(xiàn)程之間利用的經(jīng)典的生產(chǎn)者 - 消費(fèi)者模式,不論是用于處理Request的共享請(qǐng)求隊(duì)列,還是IO處理完返回的Response。
這樣的好處是什么?生產(chǎn)者和消費(fèi)者之間解耦了,可以對(duì)生產(chǎn)者或者消費(fèi)者做獨(dú)立的變更和擴(kuò)展。并且可以平衡兩者的處理能力,例如消費(fèi)不過(guò)來(lái)了,我多加些IO線(xiàn)程。
如果你看過(guò)其他中間件源碼,你會(huì)發(fā)現(xiàn)生產(chǎn)者-消費(fèi)者模式真的是太常見(jiàn)了,所以面試題經(jīng)常會(huì)有手寫(xiě)一波生產(chǎn)者-消費(fèi)者。
源碼級(jí)別剖析網(wǎng)絡(luò)通信模型
Kafka 網(wǎng)絡(luò)通信組件主要由兩大部分構(gòu)成:
SocketServer 和 KafkaRequestHandlerPool。
SocketServer
可以看出SocketServer旗下管理著,Acceptor 線(xiàn)程、Processor 線(xiàn)程和 RequestChannel等對(duì)象。
data-plane和control-plane稍后再做分析,先看看RequestChannel是什么。
RequestChannel
關(guān)鍵的屬性和方法都已經(jīng)在下面代碼中注釋了,可以看出這個(gè)對(duì)象主要就是管理Processor和作為傳輸Request和Response的中轉(zhuǎn)站。
Acceptor
接下來(lái)我們?cè)倏纯?code style="font-size: 14px;overflow-wrap: break-word;padding: 2px 4px;border-radius: 4px;margin-right: 2px;margin-left: 2px;color: rgb(30, 107, 184);background-color: rgba(27, 31, 35, 0.05);font-family: "Operator Mono", Consolas, Monaco, Menlo, monospace;word-break: break-all;">Acceptor
可以看到它繼承了AbstractServerThread,接下來(lái)再看看它run些啥
再來(lái)看看accept(key) 做了啥
很簡(jiǎn)單,標(biāo)準(zhǔn)selector的處理,獲取準(zhǔn)備就緒事件,調(diào)用serverSocketChannel.accept()得到socketChannel,將socketChannel交給通過(guò)輪詢(xún)選擇出來(lái)的Processor,之后由它來(lái)處理IO事件。
Processor
接下來(lái)我們?cè)倏纯?code style="font-size: 14px;overflow-wrap: break-word;padding: 2px 4px;border-radius: 4px;margin-right: 2px;margin-left: 2px;color: rgb(30, 107, 184);background-color: rgba(27, 31, 35, 0.05);font-family: "Operator Mono", Consolas, Monaco, Menlo, monospace;word-break: break-all;">Processor,相對(duì)而言比Acceptor 復(fù)雜一些。
先來(lái)看看三個(gè)關(guān)鍵的成員
再來(lái)看看主要的處理邏輯。
可以看到Processor主要是將底層讀事件IO數(shù)據(jù)封裝成Request存入隊(duì)列中,然后將IO線(xiàn)程塞入的Response,返還給客戶(hù)端,并處理Response 的回調(diào)邏輯。
KafkaRequestHandlerPool
IO線(xiàn)程池,實(shí)際處理請(qǐng)求的線(xiàn)程。
再來(lái)看看IO線(xiàn)程都干了些啥
很簡(jiǎn)單,核心就是不斷的從requestChannel拿請(qǐng)求,然后調(diào)用handle處理請(qǐng)求。
handle方法是位于KafkaApis類(lèi)中,可以理解為通過(guò)switch,根據(jù)請(qǐng)求頭里面不同的apikey調(diào)用不同的handle來(lái)處理請(qǐng)求。
我們?cè)倥e例看下較為簡(jiǎn)單的處理LIST_OFFSETS的過(guò)程,即handleListOffsetRequest,來(lái)完成一個(gè)請(qǐng)求的閉環(huán)。
我用紅色箭頭標(biāo)示了調(diào)用鏈。表明處理完請(qǐng)求之后是塞給對(duì)應(yīng)的Processor的。
最后再來(lái)個(gè)更詳細(xì)的總覽圖,把源碼分析到的類(lèi)基本上都對(duì)應(yīng)的加上去了。
請(qǐng)求處理優(yōu)先級(jí)
上面提到的data-plane和control-plane是時(shí)候揭開(kāi)面紗了。這兩個(gè)對(duì)應(yīng)的就是數(shù)據(jù)類(lèi)請(qǐng)求和控制類(lèi)請(qǐng)求。
為什么需要分兩類(lèi)請(qǐng)求呢?直接在請(qǐng)求里面用key標(biāo)明請(qǐng)求是要讀寫(xiě)數(shù)據(jù)啊還是更新元數(shù)據(jù)不就行了嗎?
簡(jiǎn)單點(diǎn)的說(shuō)比如我們想刪除某個(gè)topic,我們肯定是想這個(gè)topic馬上被刪除的,而此時(shí)producer還一直往這個(gè)topic寫(xiě)數(shù)據(jù)。
那這個(gè)情況可能是我們的刪除請(qǐng)求排在第N個(gè)...等前面的寫(xiě)入請(qǐng)求處理好了才輪到刪除的請(qǐng)求。實(shí)際上前面那些往這個(gè)topic寫(xiě)入的請(qǐng)求都是沒(méi)用的,平白的消耗資源。
再或者說(shuō)進(jìn)行Preferred Leader選舉時(shí)候,producer將ack設(shè)置為all時(shí)候,老leader還在等著follower寫(xiě)完數(shù)據(jù)向他報(bào)告呢,誰(shuí)知follower已經(jīng)成為了新leader。
而通知它leader已經(jīng)變更的請(qǐng)求由于被一堆數(shù)據(jù)類(lèi)型請(qǐng)求堵著呢,老leader就傻傻的在等著,直到超時(shí)。
就是為了解決這種情況,社區(qū)將請(qǐng)求分為兩類(lèi)。
那如何讓控制類(lèi)的請(qǐng)求優(yōu)先被處理??jī)?yōu)先隊(duì)列?
社區(qū)采取的是兩套Listener,即數(shù)據(jù)類(lèi)型一個(gè)listener,控制類(lèi)一個(gè)listener。
對(duì)應(yīng)的就是我們上面講的網(wǎng)絡(luò)通信模型,在kafka中有兩套! kafka通過(guò)兩套監(jiān)聽(tīng)變相的實(shí)現(xiàn)了請(qǐng)求優(yōu)先級(jí),畢竟數(shù)據(jù)類(lèi)型請(qǐng)求肯定很多,控制類(lèi)肯定少,這樣看來(lái)控制類(lèi)肯定比大部分?jǐn)?shù)據(jù)類(lèi)型先被處理!
迂回戰(zhàn)術(shù)啊。
控制類(lèi)的和數(shù)據(jù)類(lèi)區(qū)別就在于:就一個(gè)Porcessor線(xiàn)程,并且請(qǐng)求隊(duì)列寫(xiě)死的長(zhǎng)度為20,社區(qū)覺(jué)得這樣夠了。
最后
看源碼主要就是得耐心,耐心跟下去。然后再跳出來(lái)看。你會(huì)發(fā)現(xiàn)不過(guò)如此,哈哈哈。
前兩篇由于授權(quán)給他人了,因此公眾號(hào)上發(fā)不了,貼下連接,有興趣的同學(xué)可以看下。
Kafka日志段讀寫(xiě)分析:https://juejin.im/post/5ef6b94ae51d4534a1236cb0
Kafka索引在設(shè)計(jì)有什么亮點(diǎn):https://juejin.im/post/5efdeae7f265da22d017e58d
我是yes,一個(gè)在互聯(lián)網(wǎng)摸爬滾打且莫得感情的工具人。
免責(zé)聲明:本文內(nèi)容由21ic獲得授權(quán)后發(fā)布,版權(quán)歸原作者所有,本平臺(tái)僅提供信息存儲(chǔ)服務(wù)。文章僅代表作者個(gè)人觀點(diǎn),不代表本平臺(tái)立場(chǎng),如有問(wèn)題,請(qǐng)聯(lián)系我們,謝謝!





