RabbitMQ如何保證消息的可靠性

今天就跟大家聊聊有關(guān)RabbitMQ如何保證消息的可靠性,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結(jié)了以下內(nèi)容,希望大家根據(jù)這篇文章可以有所收獲。

十多年的中陽網(wǎng)站建設(shè)經(jīng)驗,針對設(shè)計、前端、開發(fā)、售后、文案、推廣等六對一服務(wù),響應(yīng)快,48小時及時工作處理。成都營銷網(wǎng)站建設(shè)的優(yōu)勢是能夠根據(jù)用戶設(shè)備顯示端的尺寸不同,自動調(diào)整中陽建站的顯示方式,使網(wǎng)站能夠適用不同顯示終端,在瀏覽器中調(diào)整網(wǎng)站的寬度,無論在任何一種瀏覽器上瀏覽網(wǎng)站,都能展現(xiàn)優(yōu)雅布局與設(shè)計,從而大程度地提升瀏覽體驗。成都創(chuàng)新互聯(lián)從事“中陽網(wǎng)站設(shè)計”,“中陽網(wǎng)站推廣”以來,每個客戶項目都認(rèn)真落實執(zhí)行。

一條消費成功被消費經(jīng)歷了生產(chǎn)者->MQ->消費者,因此在這三個步驟中都有可能造成消息丟失。

一 消息生產(chǎn)者沒有把消息成功發(fā)送到MQ

1.1 事務(wù)機制

AMQP協(xié)議提供了事務(wù)機制,在投遞消息時開啟事務(wù)支持,如果消息投遞失敗,則回滾事務(wù)。

自定義事務(wù)管理器

@Configuration
public class RabbitTranscation {

    @Bean
    public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory){
        return new RabbitTransactionManager(connectionFactory);
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
        return new RabbitTemplate(connectionFactory);
    }
}
 

修改yml

spring:
  rabbitmq:
    # 消息在未被隊列收到的情況下返回
    publisher-returns: true
 

開啟事務(wù)支持

rabbitTemplate.setChannelTransacted(true);
 

消息未接收時調(diào)用ReturnCallback

rabbitTemplate.setMandatory(true);
 

生產(chǎn)者投遞消息

@Service
public class ProviderTranscation implements RabbitTemplate.ReturnCallback {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init(){
        // 設(shè)置channel開啟事務(wù)
        rabbitTemplate.setChannelTransacted(true);
        rabbitTemplate.setReturnCallback(this);
    }

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        System.out.println("這條消息發(fā)送失敗了"+message+",請?zhí)幚?quot;);
    }

    @Transactional(rollbackFor = Exception.class,transactionManager = "rabbitTransactionManager")
    public void publishMessage(String message) throws Exception {
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.convertAndSend("javatrip",message);
    }
}
 

但是,很少有人這么干,因為這是同步操作,一條消息發(fā)送之后會使發(fā)送端阻塞,以等待RabbitMQ-Server的回應(yīng),之后才能繼續(xù)發(fā)送下一條消息,生產(chǎn)者生產(chǎn)消息的吞吐量和性能都會大大降低。

 

1.2 發(fā)送方確認(rèn)機制

發(fā)送消息時將信道設(shè)置為confirm模式,消息進入該信道后,都會被指派給一個唯一ID,一旦消息被投遞到所匹配的隊列后,RabbitMQ就會發(fā)送給生產(chǎn)者一個確認(rèn)。

開啟消息確認(rèn)機制

spring:
  rabbitmq:
    # 消息在未被隊列收到的情況下返回
    publisher-returns: true
    # 開啟消息確認(rèn)機制
    publisher-confirm-type: correlated
 

消息未接收時調(diào)用ReturnCallback

rabbitTemplate.setMandatory(true);
 

生產(chǎn)者投遞消息

@Service
public class ConfirmProvider implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init() {
        rabbitTemplate.setReturnCallback(this);
        rabbitTemplate.setConfirmCallback(this);
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if(ack){
            System.out.println("確認(rèn)了這條消息:"+correlationData);
        }else{
            System.out.println("確認(rèn)失敗了:"+correlationData+";出現(xiàn)異常:"+cause);
        }
    }

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        System.out.println("這條消息發(fā)送失敗了"+message+",請?zhí)幚?quot;);
    }

    public void publisMessage(String message){
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.convertAndSend("javatrip",message);
    }
}
 

如果消息確認(rèn)失敗后,我們可以進行消息補償,也就是消息的重試機制。當(dāng)未收到確認(rèn)信息時進行消息的重新投遞。設(shè)置如下配置即可完成。

spring:
  rabbitmq:
    # 支持消息發(fā)送失敗后重返隊列
    publisher-returns: true
    # 開啟消息確認(rèn)機制
    publisher-confirm-type: correlated
    listener:
      simple:
        retry:
          # 開啟重試
          enabled: true
          # 最大重試次數(shù)
          max-attempts: 5
          # 重試時間間隔
          initial-interval: 3000
   

二 消息發(fā)送到MQ后,MQ宕機導(dǎo)致內(nèi)存中的消息丟失

消息在MQ中有可能發(fā)生丟失,這時候我們就需要將隊列和消息都進行持久化。

@Queue注解為我們提供了隊列相關(guān)的一些屬性,具體如下:

  1. name: 隊列的名稱;

  2. durable: 是否持久化;

  3. exclusive: 是否獨享、排外的;

  4. autoDelete: 是否自動刪除;

  5. arguments:隊列的其他屬性參數(shù),有如下可選項,可參看圖2的arguments:

    • x-message-ttl:消息的過期時間,單位:毫秒;

    • x-expires:隊列過期時間,隊列在多長時間未被訪問將被刪除,單位:毫秒;

    • x-max-length:隊列最大長度,超過該最大值,則將從隊列頭部開始刪除消息;

    • x-max-length-bytes:隊列消息內(nèi)容占用最大空間,受限于內(nèi)存大小,超過該閾值則從隊列頭部開始刪除消息;

    • x-overflow:設(shè)置隊列溢出行為。這決定了當(dāng)達到隊列的最大長度時消息會發(fā)生什么。有效值是drop-head、reject-publish或reject-publish-dlx。仲裁隊列類型僅支持drop-head;

    • x-dead-letter-exchange:死信交換器名稱,過期或被刪除(因隊列長度超長或因空間超出閾值)的消息可指定發(fā)送到該交換器中;

    • x-dead-letter-routing-key:死信消息路由鍵,在消息發(fā)送到死信交換器時會使用該路由鍵,如果不設(shè)置,則使用消息的原來的路由鍵值

    • x-single-active-consumer:表示隊列是否是單一活動消費者,true時,注冊的消費組內(nèi)只有一個消費者消費消息,其他被忽略,false時消息循環(huán)分發(fā)給所有消費者(默認(rèn)false)

    • x-max-priority:隊列要支持的最大優(yōu)先級數(shù);如果未設(shè)置,隊列將不支持消息優(yōu)先級;

    • x-queue-mode(Lazy mode):將隊列設(shè)置為延遲模式,在磁盤上保留盡可能多的消息,以減少RAM的使用;如果未設(shè)置,隊列將保留內(nèi)存緩存以盡可能快地傳遞消息;

    • x-queue-master-locator:在集群模式下設(shè)置鏡像隊列的主節(jié)點信息。

持久化隊列

創(chuàng)建隊列的時候?qū)⒊志没瘜傩詃urable設(shè)置為true,同時要將autoDelete設(shè)置為false

@Queue(value = "javatrip",durable = "true",autoDelete = "false")
 

持久化消息

發(fā)送消息的時候?qū)⑾⒌膁eliveryMode設(shè)置為2,在Spring Boot中消息默認(rèn)就是持久化的。

 

三 消費者消費消息的時候,未消費完畢就出現(xiàn)了異常

消費者剛消費了消息,還沒有處理業(yè)務(wù),結(jié)果發(fā)生異常。這時候就需要關(guān)閉自動確認(rèn),改為手動確認(rèn)消息。

修改yml為手動簽收模式

spring:
  rabbitmq:
    listener:
      simple:
        # 手動簽收模式
        acknowledge-mode: manual
        # 每次簽收一條消息
        prefetch: 1
 

消費者手動簽收

@Component
@RabbitListener(queuesToDeclare = @Queue(value = "javatrip", durable = "true"))
public class Consumer {

    @RabbitHandler
    public void receive(String message, @Headers Map<String,Object> headers, Channel channel) throws Exception{

        System.out.println(message);
        // 唯一的消息ID
        Long deliverTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
        // 確認(rèn)該條消息
        if(...){
            channel.basicAck(deliverTag,false);
        }else{
            // 消費失敗,消息重返隊列
            channel.basicNack(deliverTag,false,true);
        }

    }
}
   

四 總結(jié)

 
消息丟失的原因?

生產(chǎn)者、MQ、消費者都有可能造成消息丟失

如何保證消息的可靠性?
  • 發(fā)送方采取發(fā)送者確認(rèn)模式

  • MQ進行隊列及消息的持久化

  • 消費者消費成功后手動確認(rèn)消息

看完上述內(nèi)容,你們對RabbitMQ如何保證消息的可靠性有進一步的了解嗎?如果還想了解更多知識或者相關(guān)內(nèi)容,請關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝大家的支持。

當(dāng)前題目:RabbitMQ如何保證消息的可靠性
文章路徑:http://bm7419.com/article6/pcccig.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供建站公司App開發(fā)、品牌網(wǎng)站制作、網(wǎng)站收錄、服務(wù)器托管App設(shè)計

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

成都定制網(wǎng)站網(wǎng)頁設(shè)計