分布式開(kāi)放消息系統(tǒng)RocketMQ的原理分析

這篇文章將為大家詳細(xì)講解有關(guān)分布式開(kāi)放消息系統(tǒng)RocketMQ的原理分析,小編覺(jué)得挺實(shí)用的,因此分享給大家做個(gè)參考,希望大家閱讀完這篇文章后可以有所收獲。

十余年的詔安網(wǎng)站建設(shè)經(jīng)驗(yàn),針對(duì)設(shè)計(jì)、前端、開(kāi)發(fā)、售后、文案、推廣等六對(duì)一服務(wù),響應(yīng)快,48小時(shí)及時(shí)工作處理。成都全網(wǎng)營(yíng)銷推廣的優(yōu)勢(shì)是能夠根據(jù)用戶設(shè)備顯示端的尺寸不同,自動(dòng)調(diào)整詔安建站的顯示方式,使網(wǎng)站能夠適用不同顯示終端,在瀏覽器中調(diào)整網(wǎng)站的寬度,無(wú)論在任何一種瀏覽器上瀏覽網(wǎng)站,都能展現(xiàn)優(yōu)雅布局與設(shè)計(jì),從而大程度地提升瀏覽體驗(yàn)。創(chuàng)新互聯(lián)從事“詔安網(wǎng)站設(shè)計(jì)”,“詔安網(wǎng)站推廣”以來(lái),每個(gè)客戶項(xiàng)目都認(rèn)真落實(shí)執(zhí)行。

關(guān)鍵特性以及其實(shí)現(xiàn)原理

一、順序消息

消息有序指的是可以按照消息的發(fā)送順序來(lái)消費(fèi)。例如:一筆訂單產(chǎn)生了 3 條消息,分別是訂單創(chuàng)建、訂單付款、訂單完成。消費(fèi)時(shí),要按照順序依次消費(fèi)才有意義。與此同時(shí)多筆訂單之間又是可以并行消費(fèi)的。首先來(lái)看如下示例:

假如生產(chǎn)者產(chǎn)生了2條消息:M1、M2,要保證這兩條消息的順序,應(yīng)該怎樣做?你腦中想到的可能是這樣:

分布式開(kāi)放消息系統(tǒng)RocketMQ的原理分析

假定M1發(fā)送到S1,M2發(fā)送到S2,如果要保證M1先于M2被消費(fèi),那么需要M1到達(dá)消費(fèi)端被消費(fèi)后,通知S2,然后S2再將M2發(fā)送到消費(fèi)端。

這個(gè)模型存在的問(wèn)題是,如果M1和M2分別發(fā)送到兩臺(tái)Server上,就不能保證M1先達(dá)到MQ集群,也不能保證M1被先消費(fèi)。換個(gè)角度看,如果M2先于M1達(dá)到MQ集群,甚至M2被消費(fèi)后,M1才達(dá)到消費(fèi)端,這時(shí)消息也就亂序了,說(shuō)明以上模型是不能保證消息的順序的。如何才能在MQ集群保證消息的順序?一種簡(jiǎn)單的方式就是將M1、M2發(fā)送到同一個(gè)Server上:

分布式開(kāi)放消息系統(tǒng)RocketMQ的原理分析

這樣可以保證M1先于M2到達(dá)MQServer(生產(chǎn)者等待M1發(fā)送成功后再發(fā)送M2),根據(jù)先達(dá)到先被消費(fèi)的原則,M1會(huì)先于M2被消費(fèi),這樣就保證了消息的順序。

這個(gè)模型也僅僅是理論上可以保證消息的順序,在實(shí)際場(chǎng)景中可能會(huì)遇到下面的問(wèn)題:

分布式開(kāi)放消息系統(tǒng)RocketMQ的原理分析

只要將消息從一臺(tái)服務(wù)器發(fā)往另一臺(tái)服務(wù)器,就會(huì)存在網(wǎng)絡(luò)延遲問(wèn)題。如上圖所示,如果發(fā)送M1耗時(shí)大于發(fā)送M2的耗時(shí),那么M2就仍將被先消費(fèi),仍然不能保證消息的順序。即使M1和M2同時(shí)到達(dá)消費(fèi)端,由于不清楚消費(fèi)端1和消費(fèi)端2的負(fù)載情況,仍然有可能出現(xiàn)M2先于M1被消費(fèi)的情況。

那如何解決這個(gè)問(wèn)題?將M1和M2發(fā)往同一個(gè)消費(fèi)者,且發(fā)送M1后,需要消費(fèi)端響應(yīng)成功后才能發(fā)送M2。

聰明的你可能已經(jīng)想到另外的問(wèn)題:如果M1被發(fā)送到消費(fèi)端后,消費(fèi)端1沒(méi)有響應(yīng),那是繼續(xù)發(fā)送M2呢,還是重新發(fā)送M1?一般為了保證消息一定被消費(fèi),肯定會(huì)選擇重發(fā)M1到另外一個(gè)消費(fèi)端2,就如下圖所示。

分布式開(kāi)放消息系統(tǒng)RocketMQ的原理分析

這樣的模型就嚴(yán)格保證消息的順序,細(xì)心的你仍然會(huì)發(fā)現(xiàn)問(wèn)題,消費(fèi)端1沒(méi)有響應(yīng)Server時(shí)有兩種情況,一種是M1確實(shí)沒(méi)有到達(dá)(數(shù)據(jù)在網(wǎng)絡(luò)傳送中丟失),另外一種消費(fèi)端已經(jīng)消費(fèi)M1且已經(jīng)發(fā)送響應(yīng)消息,只是MQ Server端沒(méi)有收到。如果是第二種情況,重發(fā)M1,就會(huì)造成M1被重復(fù)消費(fèi)。也就引入了我們要說(shuō)的第二個(gè)問(wèn)題,消息重復(fù)問(wèn)題,這個(gè)后文會(huì)詳細(xì)講解。

回過(guò)頭來(lái)看消息順序問(wèn)題,嚴(yán)格的順序消息非常容易理解,也可以通過(guò)文中所描述的方式來(lái)簡(jiǎn)單處理。總結(jié)起來(lái),要實(shí)現(xiàn)嚴(yán)格的順序消息,簡(jiǎn)單且可行的辦法就是:

      保證生產(chǎn)者 - MQServer - 消費(fèi)者是一對(duì)一對(duì)一的關(guān)系


這樣的設(shè)計(jì)雖然簡(jiǎn)單易行,但也會(huì)存在一些很嚴(yán)重的問(wèn)題,比如:

  1. 并行度就會(huì)成為消息系統(tǒng)的瓶頸(吞吐量不夠)

  2. 更多的異常處理,比如:只要消費(fèi)端出現(xiàn)問(wèn)題,就會(huì)導(dǎo)致整個(gè)處理流程阻塞,我們不得不花費(fèi)更多的精力來(lái)解決阻塞的問(wèn)題。

但我們的最終目標(biāo)是要集群的高容錯(cuò)性和高吞吐量。這似乎是一對(duì)不可調(diào)和的矛盾,那么阿里是如何解決的?

      世界上解決一個(gè)計(jì)算機(jī)問(wèn)題最簡(jiǎn)單的方法:“恰好”不需要解決它!—— 沈詢

有些問(wèn)題,看起來(lái)很重要,但實(shí)際上我們可以通過(guò)合理的設(shè)計(jì)或者將問(wèn)題分解來(lái)規(guī)避。如果硬要把時(shí)間花在解決問(wèn)題本身,實(shí)際上不僅效率低下,而且也是一種浪費(fèi)。從這個(gè)角度來(lái)看消息的順序問(wèn)題,我們可以得出兩個(gè)結(jié)論:

  1. 不關(guān)注亂序的應(yīng)用實(shí)際大量存在

  2. 隊(duì)列無(wú)序并不意味著消息無(wú)序

所以從業(yè)務(wù)層面來(lái)保證消息的順序而不僅僅是依賴于消息系統(tǒng),是不是我們應(yīng)該尋求的一種更合理的方式?最后我們從源碼角度分析RocketMQ怎么實(shí)現(xiàn)發(fā)送順序消息。

RocketMQ通過(guò)輪詢所有隊(duì)列的方式來(lái)確定消息被發(fā)送到哪一個(gè)隊(duì)列(負(fù)載均衡策略)。比如下面的示例中,訂單號(hào)相同的消息會(huì)被先后發(fā)送到同一個(gè)隊(duì)列中:

// RocketMQ通過(guò)MessageQueueSelector中實(shí)現(xiàn)的算法來(lái)確定消息發(fā)送到哪一個(gè)隊(duì)列上
// RocketMQ默認(rèn)提供了兩種MessageQueueSelector實(shí)現(xiàn):隨機(jī)/Hash
// 當(dāng)然你可以根據(jù)業(yè)務(wù)實(shí)現(xiàn)自己的MessageQueueSelector來(lái)決定消息按照何種策略發(fā)送到消息隊(duì)列中
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        Integer id = (Integer) arg;
        int index = id % mqs.size();
        return mqs.get(index);
    }
}, orderId);

在獲取到路由信息以后,會(huì)根據(jù)MessageQueueSelector實(shí)現(xiàn)的算法來(lái)選擇一個(gè)隊(duì)列,同一個(gè)OrderId獲取到的肯定是同一個(gè)隊(duì)列。

private SendResult send()  {
    // 獲取topic路由信息
    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    if (topicPublishInfo != null && topicPublishInfo.ok()) {
        MessageQueue mq = null;
        // 根據(jù)我們的算法,選擇一個(gè)發(fā)送隊(duì)列
        // 這里的arg = orderId
        mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg);
        if (mq != null) {
            return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, timeout);
        }
    }
}

二、消息重復(fù)

上面在解決消息順序問(wèn)題時(shí),引入了一個(gè)新的問(wèn)題,就是消息重復(fù)。那么RocketMQ是怎樣解決消息重復(fù)的問(wèn)題呢?還是“恰好”不解決。

造成消息重復(fù)的根本原因是:網(wǎng)絡(luò)不可達(dá)。只要通過(guò)網(wǎng)絡(luò)交換數(shù)據(jù),就無(wú)法避免這個(gè)問(wèn)題。所以解決這個(gè)問(wèn)題的辦法就是繞過(guò)這個(gè)問(wèn)題。那么問(wèn)題就變成了:如果消費(fèi)端收到兩條一樣的消息,應(yīng)該怎樣處理?

  1. 消費(fèi)端處理消息的業(yè)務(wù)邏輯保持冪等性

  2. 保證每條消息都有唯一編號(hào)且保證消息處理成功與去重表的日志同時(shí)出現(xiàn)

第1條很好理解,只要保持冪等性,不管來(lái)多少條重復(fù)消息,最后處理的結(jié)果都一樣。第2條原理就是利用一張日志表來(lái)記錄已經(jīng)處理成功的消息的ID,如果新到的消息ID已經(jīng)在日志表中,那么就不再處理這條消息。

第1條解決方案,很明顯應(yīng)該在消費(fèi)端實(shí)現(xiàn),不屬于消息系統(tǒng)要實(shí)現(xiàn)的功能。第2條可以消息系統(tǒng)實(shí)現(xiàn),也可以業(yè)務(wù)端實(shí)現(xiàn)。正常情況下出現(xiàn)重復(fù)消息的概率其實(shí)很小,如果由消息系統(tǒng)來(lái)實(shí)現(xiàn)的話,肯定會(huì)對(duì)消息系統(tǒng)的吞吐量和高可用有影響,所以最好還是由業(yè)務(wù)端自己處理消息重復(fù)的問(wèn)題,這也是RocketMQ不解決消息重復(fù)的問(wèn)題的原因。

RocketMQ不保證消息不重復(fù),如果你的業(yè)務(wù)需要保證嚴(yán)格的不重復(fù)消息,需要你自己在業(yè)務(wù)端去重。

三、事務(wù)消息

RocketMQ除了支持普通消息,順序消息,另外還支持事務(wù)消息。首先討論一下什么是事務(wù)消息以及支持事務(wù)消息的必要性。我們以一個(gè)轉(zhuǎn)帳的場(chǎng)景為例來(lái)說(shuō)明這個(gè)問(wèn)題:Bob向Smith轉(zhuǎn)賬100塊。

在單機(jī)環(huán)境下,執(zhí)行事務(wù)的情況,大概是下面這個(gè)樣子:

分布式開(kāi)放消息系統(tǒng)RocketMQ的原理分析

當(dāng)用戶增長(zhǎng)到一定程度,Bob和Smith的賬戶及余額信息已經(jīng)不在同一臺(tái)服務(wù)器上了,那么上面的流程就變成了這樣:

分布式開(kāi)放消息系統(tǒng)RocketMQ的原理分析

這時(shí)候你會(huì)發(fā)現(xiàn),同樣是一個(gè)轉(zhuǎn)賬的業(yè)務(wù),在集群環(huán)境下,耗時(shí)居然成倍的增長(zhǎng),這顯然是不能夠接受的。那如何來(lái)規(guī)避這個(gè)問(wèn)題?

      大事務(wù) = 小事務(wù) + 異步

將大事務(wù)拆分成多個(gè)小事務(wù)異步執(zhí)行。這樣基本上能夠?qū)⒖鐧C(jī)事務(wù)的執(zhí)行效率優(yōu)化到與單機(jī)一致。轉(zhuǎn)賬的事務(wù)就可以分解成如下兩個(gè)小事務(wù):

分布式開(kāi)放消息系統(tǒng)RocketMQ的原理分析

圖中執(zhí)行本地事務(wù)(Bob賬戶扣款)和發(fā)送異步消息應(yīng)該保證同時(shí)成功或者同時(shí)失敗,也就是扣款成功了,發(fā)送消息一定要成功,如果扣款失敗了,就不能再發(fā)送消息。那問(wèn)題是:我們是先扣款還是先發(fā)送消息呢?

首先看下先發(fā)送消息的情況,大致的示意圖如下:

分布式開(kāi)放消息系統(tǒng)RocketMQ的原理分析

存在的問(wèn)題是:如果消息發(fā)送成功,但是扣款失敗,消費(fèi)端就會(huì)消費(fèi)此消息,進(jìn)而向Smith賬戶加錢。

先發(fā)消息不行,那就先扣款吧,大致的示意圖如下:

分布式開(kāi)放消息系統(tǒng)RocketMQ的原理分析

存在的問(wèn)題跟上面類似:如果扣款成功,發(fā)送消息失敗,就會(huì)出現(xiàn)Bob扣錢了,但是Smith賬戶未加錢。

可能大家會(huì)有很多的方法來(lái)解決這個(gè)問(wèn)題,比如:直接將發(fā)消息放到Bob扣款的事務(wù)中去,如果發(fā)送失敗,拋出異常,事務(wù)回滾。這樣的處理方式也符合“恰好”不需要解決的原則。

這里需要說(shuō)明一下:如果使用Spring來(lái)管理事物的話,大可以將發(fā)送消息的邏輯放到本地事物中去,發(fā)送消息失敗拋出異常,Spring捕捉到異常后就會(huì)回滾此事物,以此來(lái)保證本地事物與發(fā)送消息的原子性。

RocketMQ支持事務(wù)消息,下面來(lái)看看RocketMQ是怎樣來(lái)實(shí)現(xiàn)的。

分布式開(kāi)放消息系統(tǒng)RocketMQ的原理分析

RocketMQ第一階段發(fā)送Prepared消息時(shí),會(huì)拿到消息的地址,第二階段執(zhí)行本地事物,第三階段通過(guò)第一階段拿到的地址去訪問(wèn)消息,并修改消息的狀態(tài)。

細(xì)心的你可能又發(fā)現(xiàn)問(wèn)題了,如果確認(rèn)消息發(fā)送失敗了怎么辦?RocketMQ會(huì)定期掃描消息集群中的事物消息,如果發(fā)現(xiàn)了Prepared消息,它會(huì)向消息發(fā)送端(生產(chǎn)者)確認(rèn),Bob的錢到底是減了還是沒(méi)減呢?如果減了是回滾還是繼續(xù)發(fā)送確認(rèn)消息呢?RocketMQ會(huì)根據(jù)發(fā)送端設(shè)置的策略來(lái)決定是回滾還是繼續(xù)發(fā)送確認(rèn)消息。這樣就保證了消息發(fā)送與本地事務(wù)同時(shí)成功或同時(shí)失敗。

那我們來(lái)看下RocketMQ源碼,是如何處理事務(wù)消息的??蛻舳税l(fā)送事務(wù)消息的部分(完整代碼請(qǐng)查看:rocketmq-example工程下的com.alibaba.rocketmq.example.transaction.TransactionProducer):

// =============================發(fā)送事務(wù)消息的一系列準(zhǔn)備工作========================================
// 未決事務(wù),MQ服務(wù)器回查客戶端
// 也就是上文所說(shuō)的,當(dāng)RocketMQ發(fā)現(xiàn)`Prepared消息`時(shí),會(huì)根據(jù)這個(gè)Listener實(shí)現(xiàn)的策略來(lái)決斷事務(wù)
TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();
// 構(gòu)造事務(wù)消息的生產(chǎn)者
TransactionMQProducer producer = new TransactionMQProducer("groupName");
// 設(shè)置事務(wù)決斷處理類
producer.setTransactionCheckListener(transactionCheckListener);
// 本地事務(wù)的處理邏輯,相當(dāng)于示例中檢查Bob賬戶并扣錢的邏輯
TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();
producer.start()
// 構(gòu)造MSG,省略構(gòu)造參數(shù)
Message msg = new Message(......);
// 發(fā)送消息
SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null);
producer.shutdown();

接著查看sendMessageInTransaction方法的源碼,總共分為3個(gè)階段:發(fā)送Prepared消息、執(zhí)行本地事務(wù)、發(fā)送確認(rèn)消息。

//  ================================事務(wù)消息的發(fā)送過(guò)程=============================================public TransactionSendResult sendMessageInTransaction(.....)  {    // 邏輯代碼,非實(shí)際代碼
    // 1.發(fā)送消息
    sendResult = this.send(msg);    // sendResult.getSendStatus() == SEND_OK
    // 2.如果消息發(fā)送成功,處理與消息關(guān)聯(lián)的本地事務(wù)單元
    LocalTransactionState localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg);    // 3.結(jié)束事務(wù)
    this.endTransaction(sendResult, localTransactionState, localException);
}

endTransaction方法會(huì)將請(qǐng)求發(fā)往broker(mq server)去更新事務(wù)消息的最終狀態(tài):

  1. 根據(jù)sendResult找到Prepared消息 ,sendResult包含事務(wù)消息的ID

  2. 根據(jù)localTransaction更新消息的最終狀態(tài)

如果endTransaction方法執(zhí)行失敗,數(shù)據(jù)沒(méi)有發(fā)送到broker,導(dǎo)致事務(wù)消息的 狀態(tài)更新失敗,broker會(huì)有回查線程定時(shí)(默認(rèn)1分鐘)掃描每個(gè)存儲(chǔ)事務(wù)狀態(tài)的表格文件,如果是已經(jīng)提交或者回滾的消息直接跳過(guò),如果是prepared狀態(tài)則會(huì)向Producer發(fā)起CheckTransaction請(qǐng)求,Producer會(huì)調(diào)用DefaultMQProducerImpl.checkTransactionState()方法來(lái)處理broker的定時(shí)回調(diào)請(qǐng)求,而checkTransactionState會(huì)調(diào)用我們的事務(wù)設(shè)置的決斷方法來(lái)決定是回滾事務(wù)還是繼續(xù)執(zhí)行,最后調(diào)用endTransactionOneway讓broker來(lái)更新消息的最終狀態(tài)。

再回到轉(zhuǎn)賬的例子,如果Bob的賬戶的余額已經(jīng)減少,且消息已經(jīng)發(fā)送成功,Smith端開(kāi)始消費(fèi)這條消息,這個(gè)時(shí)候就會(huì)出現(xiàn)消費(fèi)失敗和消費(fèi)超時(shí)兩個(gè)問(wèn)題,解決超時(shí)問(wèn)題的思路就是一直重試,直到消費(fèi)端消費(fèi)消息成功,整個(gè)過(guò)程中有可能會(huì)出現(xiàn)消息重復(fù)的問(wèn)題,按照前面的思路解決即可。

分布式開(kāi)放消息系統(tǒng)RocketMQ的原理分析

這樣基本上可以解決消費(fèi)端超時(shí)問(wèn)題,但是如果消費(fèi)失敗怎么辦?阿里提供給我們的解決方法是:人工解決。大家可以考慮一下,按照事務(wù)的流程,因?yàn)槟撤N原因Smith加款失敗,那么需要回滾整個(gè)流程。如果消息系統(tǒng)要實(shí)現(xiàn)這個(gè)回滾流程的話,系統(tǒng)復(fù)雜度將大大提升,且很容易出現(xiàn)Bug,估計(jì)出現(xiàn)Bug的概率會(huì)比消費(fèi)失敗的概率大很多。這也是RocketMQ目前暫時(shí)沒(méi)有解決這個(gè)問(wèn)題的原因,在設(shè)計(jì)實(shí)現(xiàn)消息系統(tǒng)時(shí),我們需要衡量是否值得花這么大的代價(jià)來(lái)解決這樣一個(gè)出現(xiàn)概率非常小的問(wèn)題,這也是大家在解決疑難問(wèn)題時(shí)需要多多思考的地方。

        20160321補(bǔ)充:在3.2.6版本中移除了事務(wù)消息的實(shí)現(xiàn),所以此版本不支持事務(wù)消息,具體情況請(qǐng)參考rocketmq的issues:

        https://github.com/alibaba/RocketMQ/issues/65

        https://github.com/alibaba/RocketMQ/issues/138

        https://github.com/alibaba/RocketMQ/issues/156

四、Producer如何發(fā)送消息

Producer輪詢某topic下的所有隊(duì)列的方式來(lái)實(shí)現(xiàn)發(fā)送方的負(fù)載均衡,如下圖所示:

分布式開(kāi)放消息系統(tǒng)RocketMQ的原理分析

首先分析一下RocketMQ的客戶端發(fā)送消息的源碼:

// 構(gòu)造Producer
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
// 初始化Producer,整個(gè)應(yīng)用生命周期內(nèi),只需要初始化1次
producer.start();
// 構(gòu)造Message
Message msg = new Message("TopicTest1",// topic
                        "TagA",// tag:給消息打標(biāo)簽,用于區(qū)分一類消息,可為null
                        "OrderID188",// key:自定義Key,可以用于去重,可為null
                        ("Hello MetaQ").getBytes());// body:消息內(nèi)容
// 發(fā)送消息并返回結(jié)果
SendResult sendResult = producer.send(msg);
// 清理資源,關(guān)閉網(wǎng)絡(luò)連接,注銷自己
producer.shutdown();

在整個(gè)應(yīng)用生命周期內(nèi),生產(chǎn)者需要調(diào)用一次start方法來(lái)初始化,初始化主要完成的任務(wù)有:

  1. 如果沒(méi)有指定namesrv地址,將會(huì)自動(dòng)尋址

  2. 啟動(dòng)定時(shí)任務(wù):更新namesrv地址、從namsrv更新topic路由信息、清理已經(jīng)掛掉的broker、向所有broker發(fā)送心跳...

  3. 啟動(dòng)負(fù)載均衡的服務(wù)

初始化完成后,開(kāi)始發(fā)送消息,發(fā)送消息的主要代碼如下:

private SendResult sendDefaultImpl(Message msg,......) {
    // 檢查Producer的狀態(tài)是否是RUNNING
    this.makeSureStateOK();
    // 檢查msg是否合法:是否為null、topic,body是否為空、body是否超長(zhǎng)
    Validators.checkMessage(msg, this.defaultMQProducer);
    // 獲取topic路由信息
    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    // 從路由信息中選擇一個(gè)消息隊(duì)列
    MessageQueue mq = topicPublishInfo.selectOneMessageQueue(lastBrokerName);
    // 將消息發(fā)送到該隊(duì)列上去
    sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, timeout);
}

代碼中需要關(guān)注的兩個(gè)方法tryToFindTopicPublishInfo和selectOneMessageQueue。前面說(shuō)過(guò)在producer初始化時(shí),會(huì)啟動(dòng)定時(shí)任務(wù)獲取路由信息并更新到本地緩存,所以tryToFindTopicPublishInfo會(huì)首先從緩存中獲取topic路由信息,如果沒(méi)有獲取到,則會(huì)自己去namesrv獲取路由信息。selectOneMessageQueue方法通過(guò)輪詢的方式,返回一個(gè)隊(duì)列,以達(dá)到負(fù)載均衡的目的。

如果Producer發(fā)送消息失敗,會(huì)自動(dòng)重試,重試的策略:

  1. 重試次數(shù) < retryTimesWhenSendFailed(可配置)

  2. 總的耗時(shí)(包含重試n次的耗時(shí)) < sendMsgTimeout(發(fā)送消息時(shí)傳入的參數(shù))

  3. 同時(shí)滿足上面兩個(gè)條件后,Producer會(huì)選擇另外一個(gè)隊(duì)列發(fā)送消息

五、消息存儲(chǔ)

RocketMQ的消息存儲(chǔ)是由consume queue和commit log配合完成的。

1、Consume Queue

consume queue是消息的邏輯隊(duì)列,相當(dāng)于字典的目錄,用來(lái)指定消息在物理文件commit log上的位置。

我們可以在配置中指定consumequeue與commitlog存儲(chǔ)的目錄。每個(gè)topic下的每個(gè)queue都有一個(gè)對(duì)應(yīng)的consumequeue文件,比如:

${rocketmq.home}/store/consumequeue/${topicName}/${queueId}/${fileName}

Consume Queue文件組織,如圖所示:

分布式開(kāi)放消息系統(tǒng)RocketMQ的原理分析

  1. 根據(jù)topic和queueId來(lái)組織文件,圖中TopicA有兩個(gè)隊(duì)列0,1,那么TopicA和QueueId=0組成一個(gè)ConsumeQueue,TopicA和QueueId=1組成另一個(gè)ConsumeQueue。

  2. 按照消費(fèi)端的GroupName來(lái)分組重試隊(duì)列,如果消費(fèi)端消費(fèi)失敗,消息將被發(fā)往重試隊(duì)列中,比如圖中的%RETRY%ConsumerGroupA。

  3. 按照消費(fèi)端的GroupName來(lái)分組死信隊(duì)列,如果消費(fèi)端消費(fèi)失敗,并重試指定次數(shù)后,仍然失敗,則發(fā)往死信隊(duì)列,比如圖中的%DLQ%ConsumerGroupA。

        死信隊(duì)列(Dead Letter Queue)一般用于存放由于某種原因無(wú)法傳遞的消息,比如處理失敗或者已經(jīng)過(guò)期的消息。

Consume Queue中存儲(chǔ)單元是一個(gè)20字節(jié)定長(zhǎng)的二進(jìn)制數(shù)據(jù),順序?qū)戫樞蜃x,如下圖所示:分布式開(kāi)放消息系統(tǒng)RocketMQ的原理分析

  1. CommitLog Offset是指這條消息在Commit Log文件中的實(shí)際偏移量

  2. Size存儲(chǔ)中消息的大小

  3. Message Tag HashCode存儲(chǔ)消息的Tag的哈希值:主要用于訂閱時(shí)消息過(guò)濾(訂閱時(shí)如果指定了Tag,會(huì)根據(jù)HashCode來(lái)快速查找到訂閱的消息)

2、Commit Log

CommitLog:消息存放的物理文件,每臺(tái)broker上的commitlog被本機(jī)所有的queue共享,不做任何區(qū)分。文件的默認(rèn)位置如下,仍然可通過(guò)配置文件修改:

${user.home} \store\${commitlog}\${fileName}

CommitLog的消息存儲(chǔ)單元長(zhǎng)度不固定,文件順序?qū)懀S機(jī)讀。消息的存儲(chǔ)結(jié)構(gòu)如下表所示,按照編號(hào)順序以及編號(hào)對(duì)應(yīng)的內(nèi)容依次存儲(chǔ)。

分布式開(kāi)放消息系統(tǒng)RocketMQ的原理分析

3、消息存儲(chǔ)實(shí)現(xiàn)

消息存儲(chǔ)實(shí)現(xiàn),比較復(fù)雜,也值得大家深入了解,后面會(huì)單獨(dú)成文來(lái)分析(目前正在收集素材),這小節(jié)只以代碼說(shuō)明一下具體的流程。

// Set the storage timemsg.setStoreTimestamp(System.currentTimeMillis());// Set the message body BODY CRC (consider the most appropriate settingmsg.setBodyCRC(UtilAll.crc32(msg.getBody()));
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();synchronized (this) {    long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();    // Here settings are stored timestamp, in order to ensure an orderly global
    msg.setStoreTimestamp(beginLockTimestamp);    // MapedFile:操作物理文件在內(nèi)存中的映射以及將內(nèi)存數(shù)據(jù)持久化到物理文件中
    MapedFile mapedFile = this.mapedFileQueue.getLastMapedFile();    // 將Message追加到文件commitlog
    result = mapedFile.appendMessage(msg, this.appendMessageCallback);    switch (result.getStatus()) {    case PUT_OK:break;    case END_OF_FILE:         // Create a new file, re-write the message
         mapedFile = this.mapedFileQueue.getLastMapedFile();
         result = mapedFile.appendMessage(msg, this.appendMessageCallback);     break;
     DispatchRequest dispatchRequest = new DispatchRequest(
                topic,// 1
                queueId,// 2
                result.getWroteOffset(),// 3
                result.getWroteBytes(),// 4
                tagsCode,// 5
                msg.getStoreTimestamp(),// 6
                result.getLogicsOffset(),// 7
                msg.getKeys(),// 8
                /**
                 * Transaction
                 */
                msg.getSysFlag(),// 9
                msg.getPreparedTransactionOffset());// 10
    // 1.分發(fā)消息位置到ConsumeQueue
    // 2.分發(fā)到IndexService建立索引
    this.defaultMessageStore.putDispatchRequest(dispatchRequest);
}

4、消息的索引文件

如果一個(gè)消息包含key值的話,會(huì)使用IndexFile存儲(chǔ)消息索引,文件的內(nèi)容結(jié)構(gòu)如圖:

分布式開(kāi)放消息系統(tǒng)RocketMQ的原理分析

索引文件主要用于根據(jù)key來(lái)查詢消息的,流程主要是:

  1. 根據(jù)查詢的 key 的 hashcode%slotNum 得到具體的槽的位置(slotNum 是一個(gè)索引文件里面包含的最大槽的數(shù)目,例如圖中所示 slotNum=5000000)

  2. 根據(jù) slotValue(slot 位置對(duì)應(yīng)的值)查找到索引項(xiàng)列表的最后一項(xiàng)(倒序排列,slotValue 總是指向最新的一個(gè)索引項(xiàng))

  3. 遍歷索引項(xiàng)列表返回查詢時(shí)間范圍內(nèi)的結(jié)果集(默認(rèn)一次最大返回的 32 條記錄)

六、消息訂閱

RocketMQ消息訂閱有兩種模式,一種是Push模式,即MQServer主動(dòng)向消費(fèi)端推送;另外一種是Pull模式,即消費(fèi)端在需要時(shí),主動(dòng)到MQServer拉取。但在具體實(shí)現(xiàn)時(shí),Push和Pull模式都是采用消費(fèi)端主動(dòng)拉取的方式。

首先看下消費(fèi)端的負(fù)載均衡:

分布式開(kāi)放消息系統(tǒng)RocketMQ的原理分析

消費(fèi)端會(huì)通過(guò)RebalanceService線程,10秒鐘做一次基于topic下的所有隊(duì)列負(fù)載:

  1. 遍歷Consumer下的所有topic,然后根據(jù)topic訂閱所有的消息

  2. 獲取同一topic和Consumer Group下的所有Consumer

  3. 然后根據(jù)具體的分配策略來(lái)分配消費(fèi)隊(duì)列,分配的策略包含:平均分配、消費(fèi)端配置等

如同上圖所示:如果有 5 個(gè)隊(duì)列,2 個(gè) consumer,那么第一個(gè) Consumer 消費(fèi) 3 個(gè)隊(duì)列,第二 consumer 消費(fèi) 2 個(gè)隊(duì)列。這里采用的就是平均分配策略,它類似于分頁(yè)的過(guò)程,TOPIC下面的所有queue就是記錄,Consumer的個(gè)數(shù)就相當(dāng)于總的頁(yè)數(shù),那么每頁(yè)有多少條記錄,就類似于某個(gè)Consumer會(huì)消費(fèi)哪些隊(duì)列。

通過(guò)這樣的策略來(lái)達(dá)到大體上的平均消費(fèi),這樣的設(shè)計(jì)也可以很方面的水平擴(kuò)展Consumer來(lái)提高消費(fèi)能力。

消費(fèi)端的Push模式是通過(guò)長(zhǎng)輪詢的模式來(lái)實(shí)現(xiàn)的,就如同下圖:

分布式開(kāi)放消息系統(tǒng)RocketMQ的原理分析

Consumer端每隔一段時(shí)間主動(dòng)向broker發(fā)送拉消息請(qǐng)求,broker在收到Pull請(qǐng)求后,如果有消息就立即返回?cái)?shù)據(jù),Consumer端收到返回的消息后,再回調(diào)消費(fèi)者設(shè)置的Listener方法。如果broker在收到Pull請(qǐng)求時(shí),消息隊(duì)列里沒(méi)有數(shù)據(jù),broker端會(huì)阻塞請(qǐng)求直到有數(shù)據(jù)傳遞或超時(shí)才返回。

當(dāng)然,Consumer端是通過(guò)一個(gè)線程將阻塞隊(duì)列LinkedBlockingQueue<PullRequest>中的PullRequest發(fā)送到broker拉取消息,以防止Consumer一致被阻塞。而B(niǎo)roker端,在接收到Consumer的PullRequest時(shí),如果發(fā)現(xiàn)沒(méi)有消息,就會(huì)把PullRequest扔到ConcurrentHashMap中緩存起來(lái)。broker在啟動(dòng)時(shí),會(huì)啟動(dòng)一個(gè)線程不停的從ConcurrentHashMap取出PullRequest檢查,直到有數(shù)據(jù)返回。

七、RocketMQ的其他特性

前面的6個(gè)特性都是基本上都是點(diǎn)到為止,想要深入了解,還需要大家多多查看源碼,多多在實(shí)際中運(yùn)用。當(dāng)然除了已經(jīng)提到的特性外,RocketMQ還支持:

  1. 定時(shí)消息

  2. 消息的刷盤策略

  3. 主動(dòng)同步策略:同步雙寫(xiě)、異步復(fù)制

  4. 海量消息堆積能力

  5. 高效通信

  6. .......

其中涉及到的很多設(shè)計(jì)思路和解決方法都值得我們深入研究:

  1. 消息的存儲(chǔ)設(shè)計(jì):既要滿足海量消息的堆積能力,又要滿足極快的查詢效率,還要保證寫(xiě)入的效率。

  2. 高效的通信組件設(shè)計(jì):高吞吐量,毫秒級(jí)的消息投遞能力都離不開(kāi)高效的通信。

  3. .......

RocketMQ最佳實(shí)踐

一、Producer最佳實(shí)踐

1、一個(gè)應(yīng)用盡可能用一個(gè) Topic,消息子類型用 tags 來(lái)標(biāo)識(shí),tags 可以由應(yīng)用自由設(shè)置。只有發(fā)送消息設(shè)置了tags,消費(fèi)方在訂閱消息時(shí),才可以利用 tags 在 broker 做消息過(guò)濾。

2、每個(gè)消息在業(yè)務(wù)層面的唯一標(biāo)識(shí)碼,要設(shè)置到 keys 字段,方便將來(lái)定位消息丟失問(wèn)題。由于是哈希索引,請(qǐng)務(wù)必保證 key 盡可能唯一,這樣可以避免潛在的哈希沖突。

3、消息發(fā)送成功或者失敗,要打印消息日志,務(wù)必要打印 sendresult 和 key 字段。

4、對(duì)于消息不可丟失應(yīng)用,務(wù)必要有消息重發(fā)機(jī)制。例如:消息發(fā)送失敗,存儲(chǔ)到數(shù)據(jù)庫(kù),能有定時(shí)程序嘗試重發(fā)或者人工觸發(fā)重發(fā)。

5、某些應(yīng)用如果不關(guān)注消息是否發(fā)送成功,請(qǐng)直接使用sendOneWay方法發(fā)送消息。

二、Consumer最佳實(shí)踐

1、消費(fèi)過(guò)程要做到冪等(即消費(fèi)端去重)

2、盡量使用批量方式消費(fèi)方式,可以很大程度上提高消費(fèi)吞吐量。

3、優(yōu)化每條消息消費(fèi)過(guò)程

三、其他配置

線上應(yīng)該關(guān)閉autoCreateTopicEnable,即在配置文件中將其設(shè)置為false。

RocketMQ在發(fā)送消息時(shí),會(huì)首先獲取路由信息。如果是新的消息,由于MQServer上面還沒(méi)有創(chuàng)建對(duì)應(yīng)的Topic,這個(gè)時(shí)候,如果上面的配置打開(kāi)的話,會(huì)返回默認(rèn)TOPIC的(RocketMQ會(huì)在每臺(tái)broker上面創(chuàng)建名為TBW102的TOPIC)路由信息,然后Producer會(huì)選擇一臺(tái)Broker發(fā)送消息,選中的broker在存儲(chǔ)消息時(shí),發(fā)現(xiàn)消息的topic還沒(méi)有創(chuàng)建,就會(huì)自動(dòng)創(chuàng)建topic。后果就是:以后所有該TOPIC的消息,都將發(fā)送到這臺(tái)broker上,達(dá)不到負(fù)載均衡的目的。

所以基于目前RocketMQ的設(shè)計(jì),建議關(guān)閉自動(dòng)創(chuàng)建TOPIC的功能,然后根據(jù)消息量的大小,手動(dòng)創(chuàng)建TOPIC。

RocketMQ設(shè)計(jì)相關(guān)

RocketMQ的設(shè)計(jì)假定:

  1. 每臺(tái)PC機(jī)器都可能宕機(jī)不可服務(wù)

  2. 任意集群都有可能處理能力不足

  3. 最壞的情況一定會(huì)發(fā)生

  4. 內(nèi)網(wǎng)環(huán)境需要低延遲來(lái)提供最佳用戶體驗(yàn)

RocketMQ的關(guān)鍵設(shè)計(jì):

  1. 分布式集群化

  2. 強(qiáng)數(shù)據(jù)安全

  3. 海量數(shù)據(jù)堆積

  4. 毫秒級(jí)投遞延遲(推拉模式)

這是RocketMQ在設(shè)計(jì)時(shí)的假定前提以及需要到達(dá)的效果。我想這些假定適用于所有的系統(tǒng)設(shè)計(jì)。隨著我們系統(tǒng)的服務(wù)的增多,每位開(kāi)發(fā)者都要注意自己的程序是否存在單點(diǎn)故障,如果掛了應(yīng)該怎么恢復(fù)、能不能很好的水平擴(kuò)展、對(duì)外的接口是否足夠高效、自己管理的數(shù)據(jù)是否足夠安全...... 多多規(guī)范自己的設(shè)計(jì),才能開(kāi)發(fā)出高效健壯的程序。

關(guān)于“分布式開(kāi)放消息系統(tǒng)RocketMQ的原理分析”這篇文章就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,使各位可以學(xué)到更多知識(shí),如果覺(jué)得文章不錯(cuò),請(qǐng)把它分享出去讓更多的人看到。

文章題目:分布式開(kāi)放消息系統(tǒng)RocketMQ的原理分析
本文URL:http://bm7419.com/article6/pcdeog.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供建站公司品牌網(wǎng)站制作、網(wǎng)站制作、企業(yè)網(wǎng)站制作、自適應(yīng)網(wǎng)站、App設(shè)計(jì)

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

成都app開(kāi)發(fā)公司