RocketMQ客戶端PUSH消費如何實現(xiàn)負(fù)載均衡

這篇文章將為大家詳細(xì)講解有關(guān)RocketMQ客戶端PUSH消費如何實現(xiàn)負(fù)載均衡,小編覺得挺實用的,因此分享給大家做個參考,希望大家閱讀完這篇文章后可以有所收獲。

創(chuàng)新互聯(lián)是一家集網(wǎng)站建設(shè),湘潭縣企業(yè)網(wǎng)站建設(shè),湘潭縣品牌網(wǎng)站建設(shè),網(wǎng)站定制,湘潭縣網(wǎng)站建設(shè)報價,網(wǎng)絡(luò)營銷,網(wǎng)絡(luò)優(yōu)化,湘潭縣網(wǎng)站推廣為一體的創(chuàng)新建站企業(yè),幫助傳統(tǒng)企業(yè)提升企業(yè)形象加強企業(yè)競爭力??沙浞譂M足這一群體相比中小企業(yè)更為豐富、高端、多元的互聯(lián)網(wǎng)需求。同時我們時刻保持專業(yè)、時尚、前沿,時刻以成就客戶成長自我,堅持不斷學(xué)習(xí)、思考、沉淀、凈化自己,讓我們?yōu)楦嗟钠髽I(yè)打造出實用型網(wǎng)站。

一、問題思考

1.主題隊列是如何分配的?
2.什么時候會進(jìn)行負(fù)載均衡?
3.負(fù)載均衡后是否會導(dǎo)致消息重復(fù)消費?

二、調(diào)用鏈條

1.初始化鏈條

@1 DefaultMQPushConsumerImpl#start
this.mQClientFactory
= MQClientManager.getInstance().getAndCreateMQClientInstance
@2 MQClientManager#getAndCreateMQClientInstance
instance = new MQClientInstance
@3 MQClientInstance#MQClientInstance
this.rebalanceService = new RebalanceService

2.啟動鏈條

@1 DefaultMQPushConsumerImpl#start
mQClientFactory.start()
@2 MQClientInstance#start
this.rebalanceService.start

小結(jié):從初始化鏈和調(diào)用鏈可以看出RebalanceService為線程類,隨著消費啟動時而啟動,消費不退出則一直運行著。

三、負(fù)載均衡流程


1.負(fù)載均衡鏈條

@1 RebalanceService#run
mqClientFactory.doRebalance()
@2 MQClientInstance#doRebalance
impl.doRebalance()
@3 DefaultMQPushConsumerImpl#doRebalance
this.rebalanceImpl.doRebalance
@4 RebalanceImpl#doRebalance
rebalanceByTopic

2.負(fù)載均衡流程

RocketMQ客戶端PUSH消費如何實現(xiàn)負(fù)載均衡

小結(jié):在負(fù)載均衡時,會循環(huán)該消費組訂閱的所有Topic都會執(zhí)行負(fù)載均衡。


3.更新緩存processQueue流程

RocketMQ客戶端PUSH消費如何實現(xiàn)負(fù)載均衡

小結(jié):
1. 更新緩存時如果消費組訂閱的隊列不在新分配的隊列集合中或者隊列拉取時間超時失效,則將快照ProcessQueue設(shè)置為丟棄。
2. 消費拉取時判斷ProcessQueue為丟棄,則不再對該隊列拉取。
3. 順序消費時如果獲取消費鎖成功,表明此隊列空閑沒有被消費,此時向Broker發(fā)起解鎖請求,解鎖成功后將該隊列從緩存(processQueueTable)移除。
4. 順序消費時獲取鎖失敗,表明正在消費則不從processQueueTable移除,由于ProcessQueue設(shè)置為丟棄,在順序消費下次拉取時會退出該隊列的拉取請求。

4.向Broker發(fā)送心跳流程

RocketMQ客戶端PUSH消費如何實現(xiàn)負(fù)載均衡

5.隊列分配算法

負(fù)載均衡流程圖中對clientId和分區(qū)隊列的分配提交給分區(qū)算法執(zhí)行,那該算法是如何運作的呢?接口AllocateMessageQueueStrategy隊列分配策略提供五種分配算法實現(xiàn):

1.平均分配策略
   AllocateMessageQueueAveragely

2.環(huán)形分配策略
  AllocateMessageQueueAveragelyByCircle

3.機房分配策略
   AllocateMessageQueueByMachineRoom

4.一致性Hash分配策略

   AllocateMessageQueueConsistentHash

5.配置文件分配策略
  AllocateMessageQueueByConfig
除此之外可以自定義分配算法,實現(xiàn)接口接口即可,默認(rèn)使用平均分配算法,也是最常用的,下面以該算法看看如何工作的。

public List<MessageQueue> allocate
(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
List<String> cidAll) {
List<MessageQueue> result = new ArrayList<MessageQueue>();
int index = cidAll.indexOf(currentCID);
int mod = mqAll.size() % cidAll.size();
int averageSize = mqAll.size() <= cidAll.size() ?
1 : (mod > 0 && index < mod ?
mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size());
int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
int range = Math.min(averageSize, mqAll.size() - startIndex);
for (int i = 0; i < range; i++) {
result.add(mqAll.get((startIndex + i) % mqAll.size()));
}
return result;
}

代碼不是很好閱讀,看下面驗證結(jié)果即可。


6.平均分配算法驗證

  • 只有一個clientId時分配情況
    會把1個Broker的16個分區(qū)全部分配給該客戶端,每隔20秒觸發(fā)一次負(fù)載均衡。
    currentCID=2.0.1.138@consumer01分到的隊列為0~15

----------2019-08-04 22:10:15-----------
currentCID=2.0.1.138@consumer01
index=0
mod=0
averageSize=16
startIndex=0
range=16
result=[MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=0], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=1], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=2], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=3], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=4], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=5], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=6], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=7], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=8], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=9], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=10], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=11], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=12], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=13], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=14], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=15]]
----------2019-08-04 22:10:35-----------
currentCID=2.0.1.138@consumer01
index=0
mod=0
averageSize=16
startIndex=0
range=16
result=[MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=0], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=1], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=2], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=3], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=4], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=5], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=6], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=7], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=8], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=9], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=10], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=11], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=12], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=13], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=14], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=15]]
  • 新加入第二個client時
    此時有兩個clinetId分別為2.0.1.138@consumer01和2.0.1.138@consumer02,1個 Broker16個分區(qū)的分配情況。
    currentCID=2.0.1.138@consumer01分到的分區(qū)為0~7
    currentCID=2.0.1.138@consumer02分到的分區(qū)為8~16

----------2019-08-04 22:12:25-----------
currentCID=2.0.1.138@consumer01
index=0
mod=0
averageSize=8
startIndex=0
range=8
result=[MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=0], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=1], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=2], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=3], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=4], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=5], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=6], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=7]]
----------2019-08-04 22:12:45-----------
currentCID=2.0.1.138@consumer02
index=1
mod=0
averageSize=8
startIndex=8
range=8
result=[MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=8], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=9], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=10], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=11], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=12], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=13], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=14], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=15]]
  • 新加入第三個client時
    此時有三個客戶端2.0.1.138@consumer01、2.0.1.138@consumer02、2.0.1.138@consumer03,1個Broker的16個隊列的分配情況。
    currentCID=2.0.1.138@consumer01分到的隊列0~5
    currentCID=2.0.1.138@consumer02分到的隊列6~10
    currentCID=2.0.1.138@consumer03分到的隊列11~15

----------2019-08-04 22:13:58-----------
currentCID=2.0.1.138@consumer01
index=0
mod=1
averageSize=6
startIndex=0
range=6
result=[MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=0], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=1], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=2], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=3], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=4], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=5]]
----------2019-08-04 22:14:18-----------
currentCID=2.0.1.138@consumer02
index=1
mod=1
averageSize=5
startIndex=6
range=5
result=[MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=6], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=7], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=8], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=9], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=10]]
----------2019-08-04 22:14:39-----------
currentCID=2.0.1.138@consumer03
index=2
mod=1
averageSize=5
startIndex=11
range=5
result=[MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=11], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=12], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=13], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=14], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=15]]
四、總結(jié)

1.主題隊列是如何分配的?

備注:見隊列分配算法,通常使用平均分配算法。

2.什么時候會進(jìn)行負(fù)載均衡?

備注:負(fù)載均衡線程每隔20秒執(zhí)行一次,當(dāng)有新客戶端退出或者加入或者新的Broker加入或掉線都會觸發(fā)重新負(fù)載均衡。


3.負(fù)載均衡后是否會導(dǎo)致消息重復(fù)消費?

備注:

情況1: 并發(fā)消費可能導(dǎo)致消息被重復(fù)消費,看以下代碼。

//并發(fā)消費對結(jié)果的處理

//ConsumeMessageConcurrentlyService#ConsumeRequest

if (!processQueue.isDropped()) {
 ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);

} else { //被丟棄,消費進(jìn)度不會更新

   log.warn("processQueue is dropped without process consume result.
   messageQueue=  {}, msgs={}", messageQueue, msgs);

如果負(fù)載均衡前已分配的隊列不在負(fù)載均衡后的新隊列集合中,會丟棄該隊列即:processQueue.isDropped()。而該隊列可能已經(jīng)被消費完了,在處理結(jié)果時被丟棄了,消費進(jìn)度沒有更新。別的消費客戶端重新拉取該隊列時造成重復(fù)消費。

情況2: 順序消費不會導(dǎo)致消息被重復(fù)消費

關(guān)于“RocketMQ客戶端PUSH消費如何實現(xiàn)負(fù)載均衡”這篇文章就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,使各位可以學(xué)到更多知識,如果覺得文章不錯,請把它分享出去讓更多的人看到。

網(wǎng)站題目:RocketMQ客戶端PUSH消費如何實現(xiàn)負(fù)載均衡
URL分享:http://bm7419.com/article16/jddsgg.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供靜態(tài)網(wǎng)站、、域名注冊網(wǎng)站設(shè)計公司、小程序開發(fā)、網(wǎng)站改版

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

綿陽服務(wù)器托管