RocketMQ

RocketMQ

本文檔主要是rocketmq實際代碼使用,常見詞語介紹等查看其他文檔

在越城等地區(qū),都構(gòu)建了全面的區(qū)域性戰(zhàn)略布局,加強發(fā)展的系統(tǒng)性、市場前瞻性、產(chǎn)品創(chuàng)新能力,以專注、極致的服務(wù)理念,為客戶提供成都網(wǎng)站制作、做網(wǎng)站 網(wǎng)站設(shè)計制作定制網(wǎng)站開發(fā),公司網(wǎng)站建設(shè),企業(yè)網(wǎng)站建設(shè),高端網(wǎng)站設(shè)計,營銷型網(wǎng)站,成都外貿(mào)網(wǎng)站制作,越城網(wǎng)站建設(shè)費用合理。

一 下載

http://rocketmq.apache.org/release_notes/release-notes-4.3.2/ 二進制文件下載地址,下載后可以直接解壓運行

https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.3.2/rocketmq-all-4.3.2-source-release.zip 源碼方式下載地址, 下載后需要自己打包

二 啟動

2.1 啟動nameserver

進入rocketmq的bin目錄

nohup sh mqnamesrv &

2.2 啟動broker server

進入bin目錄

nohup sh mqbroker -n localhost:9876? autoCreateTopicEnable=true &
集群方式參考集群配置文件RocketMQ集群

2.3 啟動失敗

默認(rèn)情況下,我們的服務(wù)器都是單獨的獨立服務(wù)器,不會出現(xiàn)這種情況,但是我們在測試過程中使用的是虛擬機, 配置不夠,會導(dǎo)致無法啟動

修改runbroker.sh 和 runserver.sh

分別找到下圖中的指示位置

修改內(nèi)存大小即可,大小請自己按照自己虛擬機的配置適當(dāng)調(diào)整,比如我修改為了以下值

RocketMQ

RocketMQ

三 圖形化界面

此處非必須,實際開發(fā)中使用較少

下載rocketmq-console源碼:https://github.com/apache/rocketmq-externals

進入子目錄rocketmq-console

執(zhí)行mvn命令打包

mvn clean package -DskipTests

進入target目錄

rocketmq-console-ng-1.0.0.jar即為springBoot項目

在該目錄下CMD執(zhí)行命令:

java -jar rocketmq-console-ng-1.0.0.jar --server.port=12581 --rocketmq.config.namesrvAddr=10.89.0.65:9876?
其中
--server.port為運行的這個web應(yīng)用的端口,如果不設(shè)置的話默認(rèn)為8080;--rocketmq.config.namesrvAddrRocketMQ命名服務(wù)地址,如果不設(shè)置的話默認(rèn)為“”
OK了,訪問下http://localhost:12581試試吧。

或者打包成 war 包扔到 tomcat 中運行

四 入門案例

此案例中使用的是一個消費者,所以消費者代碼只有一個

4.1 pom.xml

??? <dependencies>

? ??????<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
??????? <dependency>
??????????? <groupId>org.apache.rocketmq</groupId>
??????????? <artifactId>rocketmq-client</artifactId>
??????????? <version>4.3.2</version>
??????? </dependency>

??? </dependencies>

4.2 同步消息模式

原理:同步發(fā)送是指消息發(fā)送方發(fā)出數(shù)據(jù)后,會在收到接收方發(fā)回響應(yīng)之后才發(fā)下一個數(shù)據(jù)包的通訊方式。

應(yīng)用場景:此種方式應(yīng)用場景非常廣泛,例如重要通知郵件、報名短信通知、營銷短信系統(tǒng)等。

RocketMQ

4.2.1 生產(chǎn)者

/**
?* Created by jackiechan on 18-8-19/下午8:37.
?* 原理:同步發(fā)送是指消息發(fā)送方發(fā)出數(shù)據(jù)后,會在收到接收方發(fā)回響應(yīng)之后才發(fā)下一個數(shù)據(jù)包的通訊方式。
?*
?* 應(yīng)用場景:此種方式應(yīng)用場景非常廣泛,例如重要通知郵件、報名短信通知、營銷短信系統(tǒng)等
?*/
public class SyncProducer01 {
??? public static void main(String[] args) throws Exception {
??????? //Instantiate with a producer group name.
??????? DefaultMQProducer producer = new
??????????????? DefaultMQProducer("group1");//groupname 同一個group代表是集群
??????? //Launch the instance.
??????? producer.setNamesrvAddr("192.168.3.8:9876");//設(shè)置nameserver地址
??????? //設(shè)置實例名字
??????? producer.setInstanceName("producer");//默認(rèn)不需要設(shè)置,會以ip@pid作為名字, ip是機器ip,pid是jvmpid
??????? producer.start();
??????? for (int i = 0; i < 100; i++) {
??????????? //Create a message instance, specifying topic, tag and message body.
??????????? //topic和tags在消費者那邊獲取到消息后都可以獲取, 可以通過tag區(qū)分消息
??????????? Message msg = new Message("TopicTest" /* Topic 消息所屬的topic */,
??????????????????? "TagA" /* Tag */,
??????????????????? ("Hello RocketMQ " +
??????????????????????????? i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
??????????? );
??????????? //Call send message to deliver message to one of brokers.
??????????? SendResult sendResult = producer.send(msg);
??????????? System.out.printf("%s%n", sendResult);
??????? }
??????? //Shut down once the producer instance is not longer in use.
??????? producer.shutdown();
??? }
}

4.3 異步消息模式

原理:異步發(fā)送是指發(fā)送方發(fā)出數(shù)據(jù)后,不等接收方發(fā)回響應(yīng),接著發(fā)送下個數(shù)據(jù)包的通訊方式。MQ 的異步發(fā)送,需要用戶實現(xiàn)異步發(fā)送回調(diào)接口(SendCallback),在執(zhí)行消息的異步發(fā)送時,應(yīng)用不需要等待服務(wù)器響應(yīng)即可直接返回,通過回調(diào)接口接收務(wù)器響應(yīng),并對服務(wù)器的響應(yīng)結(jié)果進行處理。

應(yīng)用場景:異步發(fā)送一般用于鏈路耗時較長,對 RT 響應(yīng)時間較為敏感的業(yè)務(wù)場景,例如用戶視頻上傳后通知啟動轉(zhuǎn)碼服務(wù),轉(zhuǎn)碼完成后通知推送轉(zhuǎn)碼結(jié)果等。

RocketMQ

4.3.1 生產(chǎn)者


/**
?* Created by jackiechan on 18-8-19/下午10:05
?*
?* @author jackiechan
?* 原理:異步發(fā)送是指發(fā)送方發(fā)出數(shù)據(jù)后,不等接收方發(fā)回響應(yīng),接著發(fā)送下個數(shù)據(jù)包的通訊方式。MQ 的異步發(fā)送,需要用戶實現(xiàn)異步發(fā)送回調(diào)接口(SendCallback),在執(zhí)行消息的異步發(fā)送時,應(yīng)用不需要等待服務(wù)器響應(yīng)即可直接返回,通過回調(diào)接口接收務(wù)器響應(yīng),并對服務(wù)器的響應(yīng)結(jié)果進行處理。
?*
?* 應(yīng)用場景:異步發(fā)送一般用于鏈路耗時較長,對 RT 響應(yīng)時間較為敏感的業(yè)務(wù)場景,例如用戶視頻上傳后通知啟動轉(zhuǎn)碼服務(wù),轉(zhuǎn)碼完成后通知推送轉(zhuǎn)碼結(jié)果等。
?*/
public class AsyncProducer02 {
??? public static void main(String[] args) throws Exception {
??????? //Instantiate with a producer group name.
??????? DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
??????? //Launch the instance.
??????? //Launch the instance.
??????? producer.setNamesrvAddr("192.168.3.8:9876");//設(shè)置nameserver地址
??????? producer.start();
??????? producer.setRetryTimesWhenSendAsyncFailed(0);
??????? for (int i = 0; i < 100; i++) {
??????????? final int index = i;
??????????? //Create a message instance, specifying topic, tag and message body.
??????????? //消息的keys可以作為標(biāo)記或者傳遞其他消息內(nèi)容,可以在消費者獲取到消息后獲取keys進行區(qū)分
??????????? Message msg = new Message("TopicTest",
??????????????????? "TagA",
? ??????????????????"OrderID188",
??????????????????? "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
??????????? //發(fā)送異步消息, 通過設(shè)置回調(diào)來接受服務(wù)器給我們返回的消息
??????????? producer.send(msg, new SendCallback() {
??????????????? //當(dāng)發(fā)送成功的時候執(zhí)行的方法
??????????????? @Override
??????????????? public void onSuccess(SendResult sendResult) {
??????????????????? System.out.printf("%-10d OK %s %n", index,
??????????????????????????? sendResult.getMsgId());
??????????????? }
??????????????? //當(dāng)發(fā)送失敗的時候執(zhí)行
??????????????? @Override
??????????????? public void onException(Throwable e) {
??????????????????? System.out.printf("%-10d Exception %s %n", index, e);
??????????????????? e.printStackTrace();
??????????????? }
??????????? });
??????? }
??????? //Shut down once the producer instance is not longer in use.
??????? //當(dāng)發(fā)送異步消息的時候,producer 不要shutdown,因為回調(diào)是異步的,可能在收到回調(diào)的時候producer關(guān)閉了會出錯
????? //? producer.shutdown();
??? }
}

4.4 單向模式

原理:單向(Oneway)發(fā)送特點為只負(fù)責(zé)發(fā)送消息,不等待服務(wù)器回應(yīng)且沒有回調(diào)函數(shù)觸發(fā),即只發(fā)送請求不等待應(yīng)答。此方式發(fā)送消息的過程耗時非常短,一般在微秒級別。

應(yīng)用場景:適用于某些耗時非常短,但對可靠性要求并不高的場景,例如日志收集。

RocketMQ

4.4.1 生產(chǎn)者


/**
?* Created by jackiechan on 18-8-19/下午10:25
?*
?* @author jackiechan
?* 原理:單向(Oneway)發(fā)送特點為只負(fù)責(zé)發(fā)送消息,不等待服務(wù)器回應(yīng)且沒有回調(diào)函數(shù)觸發(fā),即只發(fā)送請求不等待應(yīng)答。此方式發(fā)送消息的過程耗時非常短,一般在微秒級別。
?*
?* 應(yīng)用場景:適用于某些耗時非常短,但對可靠性要求并不高的場景,例如日志收集。
?*/
public class OnewayProducer03 {
??? public static void main(String[] args) throws Exception{
??????? //Instantiate with a producer group name.
??????? DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
??????? //Launch the instance.
??????? producer.setNamesrvAddr("192.168.3.8:9876");//設(shè)置nameserver地址
??????? producer.start();
??????? for (int i = 0; i < 100; i++) {
??????????? //Create a message instance, specifying topic, tag and message body.
??????????? Message msg = new Message("TopicTest" /* Topic */,
??????????????????? "TagA" /* Tag */,
??????????????????? ("Hello RocketMQ " +
??????????????????????????? i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
??????????? );
??????????? //Call send message to deliver message to one of brokers.
??????????? producer.sendOneway(msg);

??????? }
??????? //Shut down once the producer instance is not longer in use.
??????? producer.shutdown();
??? }
}

4.5消費者

此消費者可以接收上面三種不同的消息


/**
?* Created by jackiechan on 18-8-19/下午9:50
?*
?* @authoe jackiechan
?*/
public class MqConsumer {

??? public static void main(String[] args) {
?????? ?//同一個group代表是集群
??????? DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushConsumer_yll");
??????? consumer.setNamesrvAddr("192.168.3.8:9876");
??????? try {
??????????? consumer.subscribe("TopicTest", "TagA||TagB");//可訂閱多個tag,但是一個消息只能有一個tag
??????????? consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
??????????? consumer.registerMessageListener(new MessageListenerConcurrently() {
??????????????? @Override
??????????????? public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
??????????????????? Message msg = list.get(0);
??????????????????? //輸出消息內(nèi)容
??????????????????? System.out.println("收到消息了:"+new String(msg.getBody()));
??????????????????? //此處可以根據(jù)消息的tag或者keys來區(qū)分消息
??????????????????? if (msg.getTags() != null&&msg.getTags().equals("TagA")) {
??????????????????????? //執(zhí)行TagA的邏輯
??????????????????????? System.out.println("收到的是taga的消息");
??????????????????? }
??????????????????? return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
??????????????? }
??????????? });
??????????? consumer.start();
??????? } catch (MQClientException e) {
??????????? System.out.println("出錯了");
??????? }
??? }
}

五 順序消費

消息順序

消息順序是只可以按照消息發(fā)送的順序進行消費。一個訂單產(chǎn)生3條消息,訂單創(chuàng)建、付款、訂單完成。消費時只有按照順序消費才有意義,不可能先消費付款消息再消費訂單創(chuàng)建消息,這樣就亂了。另外,多筆訂單又可以并行消費。如何保證呢?

一個訂單產(chǎn)生的消息只能發(fā)送給同一個MQ服務(wù)器中的同一個分區(qū),并且按順序發(fā)送,這樣才能在理論上保證消費者消費時是按照順序消費的,因為一個分區(qū)就是一個邏輯隊列。生產(chǎn)者雖然按順序發(fā)送,但是第一條消息到達(dá)MQ的耗時比第二條多,那么第二條則會被先消費,這樣就又導(dǎo)致消費時不是順序的。那么如何解決呢?可以采取只有第一條被消費者消費成功后再發(fā)送第二條。看下圖:

RocketMQ

但是如果第一條被發(fā)送到消費者后,消費者沒有響應(yīng)(消費者發(fā)送響應(yīng)但是因為網(wǎng)絡(luò)問題丟失或者消費者就沒有收到消息),那么在這種情況下你是繼續(xù)發(fā)送第二條還是重發(fā)第一條呢?如果是嚴(yán)格消息順序,那肯定是重發(fā)第一條,但是如果是消費者消費后的響應(yīng)丟失了,那么重發(fā)第一條就會造成重復(fù)消費。

從另外一方面看,如果不考慮網(wǎng)絡(luò)異常,那么要實現(xiàn)嚴(yán)格消息,就必須采取一種一對一關(guān)系,生產(chǎn)者A的消息對應(yīng)到MQ服務(wù)器1的X隊列,消費者A消費X隊列。這樣串行結(jié)構(gòu)就會造成系統(tǒng)吞吐量太低;更多異常需要處理比如消費端出現(xiàn)問題,那么整個消息隊列就會出現(xiàn)阻塞。RocketMQ通過輪詢所有隊列來確定消息發(fā)送到哪一個隊列(負(fù)載均衡),比如相同訂單號的消息會被先后發(fā)送到統(tǒng)一隊列中。所以RocketMQ

消息重復(fù)

造成消費重復(fù)的根本原因是網(wǎng)絡(luò)不可達(dá),只要有網(wǎng)絡(luò),這種網(wǎng)絡(luò)的不穩(wěn)定因素就存在你無法規(guī)避。所以解決這個問題的最好辦法就是繞過它。這就變成了,消費端收到兩個一樣的消息后如何處理,而不是從發(fā)送端解決不發(fā)送2個一樣的消息。對于消費端的要求就是:

  • 消費端處理業(yè)務(wù)消息要保持冪等性,也就是同一個東西執(zhí)行多次會得到相同結(jié)果

  • 保證每條消息都有唯一編號切保證消息處理成功與去重表的日志同時出現(xiàn)

第一條好理解,第二條就是利用一張日志表來記錄已經(jīng)處理成功的消息ID,如果新到的消息ID已經(jīng)存在表中那么就不再處理這個消息。第一條是在消費端實現(xiàn)的,不屬于消息系統(tǒng)的功能;第二條可以是消息系統(tǒng)實現(xiàn)也可以是業(yè)務(wù)端實現(xiàn),處于對消息系統(tǒng)的吞吐量和高可用考慮最好還是由消費端去處理。所以這也就是RocketMQ不解決消息重復(fù)的原因。

5.1 生產(chǎn)者


/**
?* Created by jackiechan on 18-8-20/上午12:08
?*
?* @author jackiechan
?*/
public class OrderedProducer {
??? public static void main(String[] args) throws Exception {
??????? //Instantiate with a producer group name.
??????? MQProducer producer = new DefaultMQProducer("example_group_name");
??????? ((DefaultMQProducer) producer).setNamesrvAddr(ServerUtil.SERVERADD);//設(shè)置服務(wù)器地址,請?zhí)鎿Q為自己的服務(wù)器地址
??? ????//Launch the instance.
??????? producer.start();
??????? String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
??????? for (int i = 0; i < 100; i++) {
??????????? int orderId = i % 10;
??????????? int a=i;
??????????? //Create a message instance, specifying topic, tag and message body.
??????????? Message msg = new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i,
??????????????????? ("Hello RocketMQ==> " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
???????? ???SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
??????????????? @Override
??????????????? public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {

??????????????????? // arg的值其實就是orderId
??????????????????? Integer id = (Integer) arg;

??????????????????? // mqs是隊列集合,也就是topic所對應(yīng)的所有隊列
??????????????????? int index = id % mqs.size();

??????????????????? // 這里根據(jù)前面的id對隊列集合大小求余來返回所對應(yīng)的隊列
??????????????????? System.out.println(index+"====>"+a);
??????????????????? return mqs.get(index);

??????????????? }
??????????? }, orderId);

?????????? // System.out.printf("%s%n", sendResult);
??????? }
??????? //server shutdown
??????? producer.shutdown();
??? }
}

5.2 消費者

消費者有多個,代碼一致


/**
?* Created by jackiechan on 18-8-20/上午12:08
?*
?* @author jackiechan
?* 順序消費的場景,一個業(yè)務(wù)需要從頭到尾按照固定順序執(zhí)行, 比如訂單的順序是 創(chuàng)建訂單-支付-發(fā)貨,必須按照這個順序執(zhí)行, 就可以通過順序消費來解決這個問題
?*/
public class OrderedConsumer {
??? public static void main(String[] args) throws Exception {
??????? DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
??????? consumer.setNamesrvAddr(ServerUtil.SERVERADD);//設(shè)置服務(wù)器地址,實際開發(fā)替換為自己的地址
??????? /**
???????? * 設(shè)置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費
???????? * 如果非第一次啟動,那么按照上次消費的位置繼續(xù)消費
???????? * 這里設(shè)置的是一個consumer的消費策略
???????? *? CONSUME_FROM_LAST_OFFSET 默認(rèn)策略,從該隊列最尾開始消費,即跳過歷史消息
???????? *? CONSUME_FROM_FIRST_OFFSET 從隊列最開始開始消費,即歷史消息(還儲存在broker的)全部消費一遍
???????? *? CONSUME_FROM_TIMESTAMP 從某個時間點開始消費,和setConsumeTimestamp()配合使用,默認(rèn)是半個小時以前
???????? *
???????? */
??????? consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

??????? consumer.subscribe("TopicTestjjj", "TagA || TagC || TagD");
??????? //設(shè)置一個Listener,主要進行消息的邏輯處理
??????? //注意這里使用的是MessageListenerOrderly這個接口
??????? consumer.registerMessageListener(new MessageListenerOrderly() {

??????????? AtomicLong consumeTimes = new AtomicLong(0);
??????????? @Override
??????????? public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
?????? ????????????????????????????????????????????????ConsumeOrderlyContext context) {
??????????????? //返回消費狀態(tài)
??????????????? //SUCCESS 消費成功
??????????????? //SUSPEND_CURRENT_QUEUE_A_MOMENT 消費失敗,暫停當(dāng)前隊列的消費

??????????????? context.setAutoCommit(false);//手動提交
??????????????? System.out.printf(Thread.currentThread().getName()+"消費者1===>" + msgs.get(0).getQueueId() +? "%n"+new String(msgs.get(0).getBody())+ "%n");
??????????????? this.consumeTimes.incrementAndGet();
??????????????? //以下內(nèi)容模擬收消息失敗,或者回滾等操作
// ???????????????if ((this.consumeTimes.get() % 2) == 0) {
//??????????????????? return ConsumeOrderlyStatus.SUCCESS;
//??????????????? } else if ((this.consumeTimes.get() % 3) == 0) {
//??????????????????? return ConsumeOrderlyStatus.ROLLBACK;
//?????????? ?????} else if ((this.consumeTimes.get() % 4) == 0) {
//??????????????????? return ConsumeOrderlyStatus.COMMIT;
//??????????????? } else if ((this.consumeTimes.get() % 5) == 0) {
//??????????????????? context.setSuspendCurrentQueueTimeMillis(3000);
//???? ???????????????return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
//??????????????? }
??????????????? return ConsumeOrderlyStatus.SUCCESS;

??????????? }
??????? });

??????? consumer.start();

??????? System.out.printf("Consumer Started.%n");
??? }
}

經(jīng)過測試發(fā)現(xiàn),不同隊列的消息收取是無序的,但是同一隊列中消息的收取順序是按照發(fā)送順序收取的

六 廣播模式

6.1 生產(chǎn)者

/**
?* Created by jackiechan on 2018/8/20/上午10:22
?*/
public class BroadcastProducer {
??? public static void main(String[] args) throws Exception {
??????? DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
??????? producer.setNamesrvAddr(ServerUtil.SERVERADD);//設(shè)置服務(wù)器地址
??????? producer.start();
??????? for (int i = 0; i < 100; i++){
??????????? //發(fā)送消息
??????????? Message msg = new Message("TopicTest",
??????????????????? "TagA",
??????????????????? "OrderID188",
??????????????????? ("Hello world==>"+i).getBytes(RemotingHelper.DEFAULT_CHARSET));
??????????? SendResult sendResult = producer.send(msg);
?? ?????????System.out.printf("%s%n", sendResult);
??????? }
??????? producer.shutdown();
??? }
}

6.2 消費者

消費者有多個,代碼一致


/**
?* Created by jackiechan on 2018/8/20/上午10:23
?* 廣播模式的應(yīng)用場景, 一個業(yè)務(wù)執(zhí)行完成后需要多個不同的后續(xù)業(yè)務(wù)都執(zhí)行,那么他們都需要知道前置業(yè)務(wù)完成,所以大家監(jiān)聽相同消息,同時獲取消息
?* 比如 電商中商品更新完成后, 可能會需要同時更新 redis 緩存與 solr 搜索引擎
?*/
public class BroadcastConsumer1 {
??? public static void main(String[] args) throws Exception {
??????? DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
??????? consumer.setConsumeMessageBatchMaxSize(10);//每次拉取十條
??????? consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
??????? consumer.setNamesrvAddr(ServerUtil.SERVERADD);
??????? //set to broadcast mode,設(shè)置消費模式為廣播
??????? consumer.setMessageModel(MessageModel.BROADCASTING);

??????? consumer.subscribe("TopicTest", "TagA || TagC || TagD");

??????? consumer.registerMessageListener(new MessageListenerConcurrently() {

??????????? @Override
??????????? public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
??????????????????????????????????????????????????????????? ConsumeConcurrentlyContext context) {
??????????????? System.out.printf(Thread.currentThread().getName() + " 消費者1收到消息 : " + new String(msgs.get(0).getBody()) + "%n");
???????????????

網(wǎng)頁標(biāo)題:RocketMQ
本文路徑:http://bm7419.com/article32/pscopc.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供微信公眾號、品牌網(wǎng)站設(shè)計網(wǎng)站制作、小程序開發(fā)全網(wǎng)營銷推廣、網(wǎng)站設(shè)計公司

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)