RocketMQ如何獲取指定消息-創(chuàng)新互聯(lián)

小編給大家分享一下RocketMQ如何獲取指定消息,相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!

創(chuàng)新互聯(lián)建站專注于新余網(wǎng)站建設(shè)服務(wù)及定制,我們擁有豐富的企業(yè)做網(wǎng)站經(jīng)驗(yàn)。 熱誠為您提供新余營銷型網(wǎng)站建設(shè),新余網(wǎng)站制作、新余網(wǎng)頁設(shè)計(jì)、新余網(wǎng)站官網(wǎng)定制、成都小程序開發(fā)服務(wù),打造新余網(wǎng)絡(luò)公司原創(chuàng)品牌,更為您提供新余網(wǎng)站排名全網(wǎng)營銷落地服務(wù)。

概要

消息查詢是什么?

消息查詢就是根據(jù)用戶提供的msgId從MQ中取出該消息

RocketMQ如果有多個節(jié)點(diǎn)如何查詢?

問題:RocketMQ分布式結(jié)構(gòu)中,數(shù)據(jù)分散在各個節(jié)點(diǎn),即便是同一Topic的數(shù)據(jù),也未必都在一個broker上??蛻舳嗽趺粗罃?shù)據(jù)該去哪個節(jié)點(diǎn)上查?

猜想1:逐個訪問broker節(jié)點(diǎn)查詢數(shù)據(jù)

猜想2:有某種數(shù)據(jù)中心存在,該中心知道所有消息存儲的位置,只要向該中心查詢即可得到消息具體位置,進(jìn)而取得消息內(nèi)容

實(shí)際:

1.消息Id中含有消息所在的broker的地址信息(IP\Port)以及該消息在CommitLog中的偏移量。

2.客戶端實(shí)現(xiàn)會從msgId字符串中解析出broker地址,向指定broker節(jié)查詢消息。

問題:CommitLog文件有多個,只有偏移量估計(jì)不能確定在哪個文件吧?

實(shí)際:單個Broker節(jié)點(diǎn)內(nèi)offset是全局唯一的,不是每個CommitLog文件的偏移量都是從0開始的。單個節(jié)點(diǎn)內(nèi)所有CommitLog文件共用一套偏移量,每個文件的文件名為其第一個消息的偏移量。所以可以根據(jù)偏移量和文件名確定CommitLog文件。

源碼閱讀

0.使用方式

MessageExt  msg = consumer.viewMessage(msgId);

1.消息ID解析

這個了解下就可以了

public class MessageId {
 private SocketAddress address;
 private long offset;

 public MessageId(SocketAddress address, long offset) {
  this.address = address;
  this.offset = offset;
 }

 //get-set
}

//from MQAdminImpl.java
public MessageExt viewMessage(
 String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {

 MessageId messageId = null;
 try {
  //從msgId字符串中解析出address和offset
  //address = ip:port
  //offset為消息在CommitLog文件中的偏移量
  messageId = MessageDecoder.decodeMessageId(msgId);
 } catch (Exception e) {
  throw new MQClientException(ResponseCode.NO_MESSAGE, "query message by id finished, but no message.");
 }
 return this.mQClientFactory.getMQClientAPIImpl().viewMessage(RemotingUtil.socketAddress2String(messageId.getAddress()),
  messageId.getOffset(), timeoutMillis);
}

//from MessageDecoder.java
public static MessageId decodeMessageId(final String msgId) throws UnknownHostException {
 SocketAddress address;
 long offset;
 //ipv4和ipv6的區(qū)別
 //如果msgId總長度超過32字符,則為ipv6
 int ipLength = msgId.length() == 32 ? 4 * 2 : 16 * 2;

 byte[] ip = UtilAll.string2bytes(msgId.substring(0, ipLength));
 byte[] port = UtilAll.string2bytes(msgId.substring(ipLength, ipLength + 8));
 ByteBuffer bb = ByteBuffer.wrap(port);
 int portInt = bb.getInt(0);
 address = new InetSocketAddress(InetAddress.getByAddress(ip), portInt);

 // offset
 byte[] data = UtilAll.string2bytes(msgId.substring(ipLength + 8, ipLength + 8 + 16));
 bb = ByteBuffer.wrap(data);
 offset = bb.getLong(0);

 return new MessageId(address, offset);
}

本文標(biāo)題:RocketMQ如何獲取指定消息-創(chuàng)新互聯(lián)
本文URL:http://bm7419.com/article44/dsccee.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站營銷、搜索引擎優(yōu)化商城網(wǎng)站、云服務(wù)器、手機(jī)網(wǎng)站建設(shè)、做網(wǎng)站

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

成都app開發(fā)公司