這篇文章給大家介紹SpringBoot中怎么整合RocketMQ,內(nèi)容非常詳細(xì),感興趣的小伙伴們可以參考借鑒,希望對(duì)大家能有所幫助。
工農(nóng)網(wǎng)站制作公司哪家好,找創(chuàng)新互聯(lián)建站!從網(wǎng)頁(yè)設(shè)計(jì)、網(wǎng)站建設(shè)、微信開(kāi)發(fā)、APP開(kāi)發(fā)、成都響應(yīng)式網(wǎng)站建設(shè)公司等網(wǎng)站項(xiàng)目制作,到程序開(kāi)發(fā),運(yùn)營(yíng)維護(hù)。創(chuàng)新互聯(lián)建站于2013年創(chuàng)立到現(xiàn)在10年的時(shí)間,我們擁有了豐富的建站經(jīng)驗(yàn)和運(yùn)維經(jīng)驗(yàn),來(lái)保證我們的工作的順利進(jìn)行。專注于網(wǎng)站建設(shè)就選創(chuàng)新互聯(lián)建站。
RocketMQ機(jī)構(gòu)及概念
RocketMQ主要由 Producer、Broker、Consumer 三部分組成,其中Producer 負(fù)責(zé)生產(chǎn)消息,Consumer 負(fù)責(zé)消費(fèi)消息,Broker 負(fù)責(zé)存儲(chǔ)消息。Broker 在實(shí)際部署過(guò)程中對(duì)應(yīng)一臺(tái)服務(wù)器,每個(gè) Broker 可以存儲(chǔ)多個(gè)Topic的消息,每個(gè)Topic的消息也可以分片存儲(chǔ)于不同的 Broker。Message Queue 用于存儲(chǔ)消息的物理地址,每個(gè)Topic中的消息地址存儲(chǔ)于多個(gè) Message Queue 中。ConsumerGroup 由多個(gè)Consumer 實(shí)例構(gòu)成。
負(fù)責(zé)生產(chǎn)消息,一般由業(yè)務(wù)系統(tǒng)負(fù)責(zé)生產(chǎn)消息。一個(gè)消息生產(chǎn)者會(huì)把業(yè)務(wù)應(yīng)用系統(tǒng)里產(chǎn)生的消息發(fā)送到broker服務(wù)器。RocketMQ提供多種發(fā)送方式,同步發(fā)送、異步發(fā)送、順序發(fā)送、單向發(fā)送。同步和異步方式均需要Broker返回確認(rèn)信息,單向發(fā)送不需要。
負(fù)責(zé)消費(fèi)消息,一般是后臺(tái)系統(tǒng)負(fù)責(zé)異步消費(fèi)。一個(gè)消息消費(fèi)者會(huì)從Broker服務(wù)器拉取消息、并將其提供給應(yīng)用程序。從用戶應(yīng)用的角度而言提供了兩種消費(fèi)形式:拉取式消費(fèi)、推動(dòng)式消費(fèi)。
表示一類消息的集合,每個(gè)主題包含若干條消息,每條消息只能屬于一個(gè)主題,是RocketMQ進(jìn)行消息訂閱的基本單位。
消息中轉(zhuǎn)角色,負(fù)責(zé)存儲(chǔ)消息、轉(zhuǎn)發(fā)消息。代理服務(wù)器在RocketMQ系統(tǒng)中負(fù)責(zé)接收從生產(chǎn)者發(fā)送來(lái)的消息并存儲(chǔ)、同時(shí)為消費(fèi)者的拉取請(qǐng)求作準(zhǔn)備。代理服務(wù)器也存儲(chǔ)消息相關(guān)的元數(shù)據(jù),包括消費(fèi)者組、消費(fèi)進(jìn)度偏移和主題和隊(duì)列消息等。如下圖:
名稱服務(wù)充當(dāng)路由消息的提供者。生產(chǎn)者或消費(fèi)者能夠通過(guò)名字服務(wù)查找各主題相應(yīng)的Broker IP列表。多個(gè)Namesrv實(shí)例組成集群,但相互獨(dú)立,沒(méi)有信息交換。
Consumer消費(fèi)的一種類型,應(yīng)用通常主動(dòng)調(diào)用Consumer的拉消息方法從Broker服務(wù)器拉消息、主動(dòng)權(quán)由應(yīng)用控制。一旦獲取了批量消息,應(yīng)用就會(huì)啟動(dòng)消費(fèi)過(guò)程。
Consumer消費(fèi)的一種類型,該模式下Broker收到數(shù)據(jù)后會(huì)主動(dòng)推送給消費(fèi)端,該消費(fèi)模式一般實(shí)時(shí)性較高。
同一類Producer的集合,這類Producer發(fā)送同一類消息且發(fā)送邏輯一致。如果發(fā)送的是事務(wù)消息且原始生產(chǎn)者在發(fā)送之后崩潰,則Broker服務(wù)器會(huì)聯(lián)系同一生產(chǎn)者組的其他生產(chǎn)者實(shí)例以提交或回溯消費(fèi)。
同一類Consumer的集合,這類Consumer通常消費(fèi)同一類消息且消費(fèi)邏輯一致。消費(fèi)者組使得在消息消費(fèi)方面,實(shí)現(xiàn)負(fù)載均衡和容錯(cuò)的目標(biāo)變得非常容易。要注意的是,消費(fèi)者組的消費(fèi)者實(shí)例必須訂閱完全相同的Topic。RocketMQ 支持兩種消息模式:集群消費(fèi)(Clustering)和廣播消費(fèi)(Broadcasting)。
集群消費(fèi)模式下,相同Consumer Group的每個(gè)Consumer實(shí)例平均分?jǐn)傁ⅰ?/p>
廣播消費(fèi)模式下,相同Consumer Group的每個(gè)Consumer實(shí)例都接收全量的消息。
普通順序消費(fèi)模式下,消費(fèi)者通過(guò)同一個(gè)消費(fèi)隊(duì)列收到的消息是有順序的,不同消息隊(duì)列收到的消息則可能是無(wú)順序的。
嚴(yán)格順序消息模式下,消費(fèi)者收到的所有消息均是有順序的。
消息系統(tǒng)所傳輸信息的物理載體,生產(chǎn)和消費(fèi)數(shù)據(jù)的最小單位,每條消息必須屬于一個(gè)主題。RocketMQ中每個(gè)消息擁有唯一的Message ID,且可以攜帶具有業(yè)務(wù)標(biāo)識(shí)的Key。系統(tǒng)提供了通過(guò)Message ID和Key查詢消息的功能。
為消息設(shè)置的標(biāo)志,用于同一主題下區(qū)分不同類型的消息。來(lái)自同一業(yè)務(wù)單元的消息,可以根據(jù)不同業(yè)務(wù)目的在同一主題下設(shè)置不同標(biāo)簽。標(biāo)簽?zāi)軌蛴行У乇3执a的清晰度和連貫性,并優(yōu)化RocketMQ提供的查詢系統(tǒng)。消費(fèi)者可以根據(jù)Tag實(shí)現(xiàn)對(duì)不同子主題的不同消費(fèi)邏輯,實(shí)現(xiàn)更好的擴(kuò)展性。
ActiveMQ,Kafka,RocketMQ對(duì)比:
1 下載RocketMQ
2 配置環(huán)境變量
3 啟動(dòng)Name Server
4 啟動(dòng) Broker
5 通過(guò)命令行發(fā)送 & 接收消息
設(shè)置環(huán)境變量:
C:\Users\MSI-NB>set NAMESRV_ADDR=localhost:9876
發(fā)送消息:
C:\Users\MSI-NB>tools org.apache.rocketmq.example.quickstart.Producer
接收消息:
依賴:
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.2.0</version> </dependency>
配置文件:
rocketmq: nameServer: localhost:9876 producer: group: demo-mq
生產(chǎn)者:
@Service public class ProducerService { @Resource private RocketMQTemplate rocketMQTemplate ; public void send(String message) { rocketMQTemplate.convertAndSend("test-topic", message); } }
消費(fèi)者:
@RocketMQMessageListener(topic = "test-topic", consumerGroup = "consumer01-group") @Component public class ConsumerListener implements RocketMQListener<String> { @Override public void onMessage(String message) { System.out.println("接收到消息:" + message) ; } }
這里的topic要和發(fā)送端設(shè)置的一致,consumerGroup可隨意。
發(fā)送接口:
@RestController @RequestMapping("/messages") public class MessageController { @Resource private ProducerService ps ; @GetMapping("") public Object send(String message) { ps.send(message) ; return "send success" ; } }
測(cè)試:
發(fā)送時(shí):
rocketMQTemplate.convertAndSend("test-topic:tag1", message);
接收時(shí):
@RocketMQMessageListener(topic = "test-topic", consumerGroup = "consumer01-group", selectorExpression = "tag1")
selectorExpression:默認(rèn)是 “*” ,這里指定與發(fā)送的一致;
這里看下源碼:
RocketMQUtil.java
這里topic與tags是用冒號(hào) ":" 分割的,tags就是取的數(shù)組的第二個(gè)。
關(guān)于SpringBoot中怎么整合RocketMQ就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,可以學(xué)到更多知識(shí)。如果覺(jué)得文章不錯(cuò),可以把它分享出去讓更多的人看到。
當(dāng)前題目:SpringBoot中怎么整合RocketMQ
網(wǎng)頁(yè)網(wǎng)址:http://bm7419.com/article6/goccog.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站制作、定制網(wǎng)站、域名注冊(cè)、移動(dòng)網(wǎng)站建設(shè)、定制開(kāi)發(fā)、外貿(mào)網(wǎng)站建設(shè)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)
營(yíng)銷型網(wǎng)站建設(shè)知識(shí)