學(xué)到羊之Kafka-創(chuàng)新互聯(lián)

改則網(wǎng)站建設(shè)公司創(chuàng)新互聯(lián)公司,改則網(wǎng)站設(shè)計(jì)制作,有大型網(wǎng)站制作公司豐富經(jīng)驗(yàn)。已為改則上千提供企業(yè)網(wǎng)站建設(shè)服務(wù)。企業(yè)網(wǎng)站搭建\外貿(mào)網(wǎng)站制作要多少錢,請找那個(gè)售后服務(wù)好的改則做網(wǎng)站的公司定做!1 kafka 是啥

Kafka 是一款開源的消息引擎系統(tǒng),用來實(shí)現(xiàn)解耦的異步式數(shù)據(jù)傳遞。即系統(tǒng) A 發(fā)消息給到 消息引擎系統(tǒng),系統(tǒng) B 通過消息引擎系統(tǒng)讀取 A 發(fā)送的消息,在大數(shù)據(jù)場景下,能達(dá)到削峰填谷的效果。

2 Kafka 術(shù)語

Kafka 中的分區(qū)機(jī)制指的是將每個(gè)主題(Topic)劃分成多個(gè)分區(qū)(Partition),每個(gè)分區(qū)是一組有序的消息日志。生產(chǎn)者生產(chǎn)的每條消息只會被發(fā)送到一個(gè)分區(qū)中,也就是說如果向一個(gè)雙分區(qū)的主題發(fā)送一條消息,這條消息要么在分區(qū) 0 中,要么在分區(qū) 1 中。Kafka 的分區(qū)編號是從 0 開始的,如果 Topic 有 100 個(gè)分區(qū),那么它們的分區(qū)號就是從 0 到 99。每個(gè)分區(qū)下可以配置若干個(gè)副本,其中只能有 1 個(gè)領(lǐng)導(dǎo)者副本和 N-1 個(gè)追隨者副本。

Kafka 的三層消息架構(gòu):

1)主題層,每個(gè)主題可以配置 M 個(gè)分區(qū),而每個(gè)分區(qū)又可以配置 N 個(gè)副本。

2)分區(qū)層,每個(gè)分區(qū)的 N 個(gè)副本中只能有一個(gè)充當(dāng)領(lǐng)導(dǎo)者角色,對外提供服務(wù);其他 N-1 個(gè)副本是追隨者副本,只是提供數(shù)據(jù)冗余之用。

3)消息層,分區(qū)中包含若干條消息,每條消息的位移從 0 開始,依次遞增。最后,客戶端程序只能與分區(qū)的領(lǐng)導(dǎo)者副本進(jìn)行交互。

Broker 如何持久化數(shù)據(jù)?

Kafka 使用消息日志(Log)來保存數(shù)據(jù),一個(gè)日志就是磁盤上一個(gè)只能追加寫(Append-only)消息的物理文件。因?yàn)橹荒茏芳訉懭?,故避免了緩慢的隨機(jī) I/O 操作,改為性能較好的順序 I/O 寫操作,這也是實(shí)現(xiàn) Kafka 高吞吐量特性的一個(gè)重要手段。如果不停地向一個(gè)日志寫入消息,最終也會耗盡所有的磁盤空間,因此 Kafka 必然要定期地刪除消息以回收磁盤。怎么刪除呢?簡單來說就是通過日志段(Log Segment)機(jī)制。在 Kafka 底層,一個(gè)日志又進(jìn)一步細(xì)分成多個(gè)日志段,消息被追加寫到當(dāng)前最新的日志段中,當(dāng)寫滿了一個(gè)日志段后,Kafka 會自動(dòng)切分出一個(gè)新的日志段,并將老的日志段封存起來。Kafka 在后臺還有定時(shí)任務(wù)會定期地檢查老的日志段是否能夠被刪除,從而實(shí)現(xiàn)回收磁盤空間的目的。

3 生產(chǎn)者 3.1 消息發(fā)送

  1. Producer創(chuàng)建時(shí),會創(chuàng)建一個(gè)Sender線程并設(shè)置為守護(hù)線程;

  2. 生產(chǎn)消息時(shí),內(nèi)部是異步流程。生產(chǎn)的消息先經(jīng)過攔截器->序列化器->分區(qū)器,然后將消息緩存在緩沖區(qū)(該緩沖區(qū)也是在Producer創(chuàng)建時(shí)創(chuàng)建);

  3. 批次發(fā)送的條件為:緩沖區(qū)數(shù)據(jù)大小達(dá)到 batch.size 或者 linger.ms 達(dá)到上限,哪個(gè)先達(dá)到就算哪個(gè);

  4. 批次發(fā)送后,發(fā)往指定分區(qū),然后落盤到broker;如果生產(chǎn)者配置了 retrires 參數(shù)大于 0 并且失敗原因允許重試,那么客戶端內(nèi)部會對該消息進(jìn)行重試;

  5. 落盤到broker成功,返回生產(chǎn)元數(shù)據(jù)給生產(chǎn)者;

  6. 元數(shù)據(jù)返回有兩種方式:一種是通過阻塞直接返回,另一種是通過回調(diào)返回。

3.2 原理剖析

3.3 分區(qū)機(jī)制

主題是承載真實(shí)數(shù)據(jù)的邏輯容器,而在主題之下還分為若干個(gè)分區(qū),主題下的每條消息只會保存在某一個(gè)分區(qū)中,而不會在多個(gè)分區(qū)中被保存多份。

為什么使用分區(qū)的概念而不是直接使用多個(gè)主題呢?

對數(shù)據(jù)進(jìn)行分區(qū)的主要原因是為了實(shí)現(xiàn)系統(tǒng)的高伸縮性(Scalability)。不同的分區(qū)能夠被放置到不同節(jié)點(diǎn)的機(jī)器上,而數(shù)據(jù)的讀寫操作也都是針對分區(qū)這個(gè)粒度而進(jìn)行的,這樣每個(gè)節(jié)點(diǎn)的機(jī)器都能獨(dú)立地執(zhí)行各自分區(qū)的讀寫請求處理。并且,還可以通過添加新的節(jié)點(diǎn)機(jī)器來增加整體系統(tǒng)的吞吐量。

3.3.1 分區(qū)策略

所謂分區(qū)策略是決定生產(chǎn)者將消息發(fā)送到哪個(gè)分區(qū)的算法。

輪詢策略

順序分配。比如一個(gè)主題下有 3 個(gè)分區(qū),那么第一條消息被發(fā)送到分區(qū) 0,第二條被發(fā)送到分區(qū) 1,第三條被發(fā)送到分區(qū) 2,以此類推。當(dāng)生產(chǎn)第 4 條消息時(shí)又會重新開始,即將其分配到分區(qū) 0。

輪詢策略有非常優(yōu)秀的負(fù)載均衡表現(xiàn),它總是能保證消息大限度地被平均分配到所有分區(qū)上,故默認(rèn)情況下它是最合理的分區(qū)策略,也是最常用的分區(qū)策略之一。?

隨機(jī)策略

隨意地將消息放置到任意一個(gè)分區(qū)上

Key-ordering 策略

Kafka 允許為每條消息定義消息鍵,簡稱為 Key。這個(gè) Key 可以是一個(gè)有著明確業(yè)務(wù)含義的字符串,比如客戶代碼、部門編號或是業(yè)務(wù) ID 等。一旦消息被定義了 Key,就可以保證同一個(gè) Key 的所有消息都進(jìn)入到相同的分區(qū)里面,由于每個(gè)分區(qū)下的消息處理都是有順序的。

假設(shè)有一個(gè)服務(wù)需要監(jiān)聽某個(gè)公眾號用戶關(guān)注取關(guān)的事件,發(fā)送的消息必須要保證有序性,不然會導(dǎo)致結(jié)果混亂。如果給 Kafka 主題只設(shè)置 1 個(gè)分區(qū),這樣所有的消息都只在這一個(gè)分區(qū)內(nèi)讀寫,因此保證了全局的順序性。

這樣做雖然實(shí)現(xiàn)了因果關(guān)系的順序性,但也喪失了 Kafka 多分區(qū)帶來的高吞吐量和負(fù)載均衡的優(yōu)勢。

可以在消息體中封裝了固定的標(biāo)志位,并對此標(biāo)志位設(shè)定專門的分區(qū)策略,保證同一標(biāo)志位的所有消息都發(fā)送到同一分區(qū),這樣既可以保證分區(qū)內(nèi)的消息順序,也可以享受到多分區(qū)帶來的性能紅利。

4 消費(fèi)者 4.1?消費(fèi)組(Consumer Group)

消費(fèi)組是 kafka 提供的可擴(kuò)展且具有容錯(cuò)性的消費(fèi)者機(jī)制,是 Kafka 實(shí)現(xiàn)單播和廣播兩種消息模型的手段。

多個(gè)從同一個(gè)主題消費(fèi)的消費(fèi)者可以加入到一個(gè)消費(fèi)組中,消費(fèi)組中的消費(fèi)者共享 Group Id。組內(nèi)的所有消費(fèi)者協(xié)調(diào)在一起來消費(fèi)訂閱主題的所有分區(qū),每個(gè)分區(qū)只能由同一個(gè)消費(fèi)者組內(nèi)的一個(gè) Consumer 實(shí)例來消費(fèi)。

4.2?消費(fèi)消息

Consumer 采用 pull 模式從 broker 中讀取數(shù)據(jù),可以自主控制消費(fèi)方式,逐條消費(fèi)或批量消費(fèi)。

4.2.1 位移提交

Consumer 需要向 Kafka 記錄自己的位移數(shù)據(jù),這個(gè)匯報(bào)過程稱為 提交位移(Committing Offsets)。這個(gè)過程非常靈活,可以提交任何位移值,但也會由此產(chǎn)生系列不好的結(jié)果。假設(shè)?Consumer 消費(fèi)了 10 條消息,提交的位移值卻是 20,那么位移介于 11~19 之間的消息是有可能丟失的;相反地,如果提交的位移值是 5,那么位移介于 5~9 之間的消息就有可能被重復(fù)消費(fèi)。

自動(dòng)提交

1)開啟自動(dòng)提交: enable.auto.commit=true,默認(rèn)為 true

2)配置自動(dòng)提交間隔: auto.commit.interval.ms ,默認(rèn) 5s

自動(dòng)提交會導(dǎo)致消息被重復(fù)消費(fèi)

  • Consumer 每 5s 提交 offset
  • 假設(shè)提交 offset 后的 3s 發(fā)生了 Rebalance
  • Rebalance 之后的所有 Consumer 從上一次提交的 offset 處繼續(xù)消費(fèi)
  • 因此 Rebalance 發(fā)生前 3s 的消息會被重復(fù)消費(fèi)

雖然能通過減少 auto.commit.interval.ms 的值來提高提交頻率,但這么做只能縮小重復(fù)消費(fèi)的時(shí)間窗口,不可能完全消除它。

手動(dòng)同步提交

使用 KafkaConsumer#commitSync(),會提交 KafkaConsumer#poll() 返回的最新 offset。

該方法為同步操作,等待直到 offset 被成功提交才返回。

while (true) {
    ConsumerRecordsrecords =
consumer.poll(Duration.ofSeconds(1)); process(records); // 處理消息
try {
        // Consumer 程序會處于阻塞狀態(tài),直到遠(yuǎn)端的 Broker 返回提交結(jié)果
        consumer.commitSync();
    } catch (CommitFailedException e) {
        // 處理提交失敗異常
        handle(e);  
    }
}

手動(dòng)異步提交

while (true) {
            ConsumerRecordsrecords = 
  consumer.poll(Duration.ofSeconds(1));
            // 處理消息
            process(records); 
            // 會立即返回結(jié)果,不會阻塞
            consumer.commitAsync((offsets, exception) ->{
  if (exception != null)
            handle(exception);
  });
}

但?commitAsync 不能替代?commitSync,因?yàn)槌霈F(xiàn)問題時(shí)它不會自動(dòng)重試。由于是異步操作,倘若提交失敗后自動(dòng)重試,那么它重試時(shí)提交的位移值可能早已經(jīng)“過期”或不是最新值了。因此,異步提交的重試其實(shí)沒有意義,所以 commitAsync 是不會重試的。

手動(dòng)同步提交與異步提交結(jié)合

try {
            while (true) {
                ConsumerRecordsrecords =
                        consumer.poll(Duration.ofSeconds(1));
                // 處理消息
                process(records);
                // 使用異步提交規(guī)避阻塞
                commitAysnc();
            }
        } catch (Exception e) {
            // 處理異常
            handle(e);
        } finally {
            try {
                // Consumer 要關(guān)閉前使用同步阻塞式提交,以確保 Consumer 關(guān)閉前能夠保存正確的位移數(shù)據(jù)
                consumer.commitSync();
            } finally {
                consumer.close();
            }
        }
4.2.2 位移管理

Kafka默認(rèn)定期自動(dòng)提交位移( enable.auto.commit = true ),也手動(dòng)提交位移。另外kafka會定期把group消費(fèi)情況保存起來,做成一個(gè)offset map?

位移管理機(jī)制將 Consumer 的位移數(shù)據(jù)作為一條條普通的 Kafka 消息,提交到 __consumer_offsets 主題中。

5 異常處理

如何保證消息不被重復(fù)消費(fèi)?

如何保證消息消費(fèi)的冪等性?

如何防止消息丟失?

如何處理消息積壓?

消費(fèi)慢了怎么處理?

你是否還在尋找穩(wěn)定的海外服務(wù)器提供商?創(chuàng)新互聯(lián)www.cdcxhl.cn海外機(jī)房具備T級流量清洗系統(tǒng)配攻擊溯源,準(zhǔn)確流量調(diào)度確保服務(wù)器高可用性,企業(yè)級服務(wù)器適合批量采購,新人活動(dòng)首月15元起,快前往官網(wǎng)查看詳情吧

名稱欄目:學(xué)到羊之Kafka-創(chuàng)新互聯(lián)
分享URL:http://bm7419.com/article10/dscego.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站內(nèi)鏈品牌網(wǎng)站建設(shè)、小程序開發(fā)、網(wǎng)站維護(hù)、手機(jī)網(wǎng)站建設(shè)、面包屑導(dǎo)航

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)

成都網(wǎng)站建設(shè)公司