RabbitMQ如何保證隊(duì)列里的消息99.99%被消費(fèi)?-創(chuàng)新互聯(lián)

1. 本篇概要

其實(shí),還有1種場景需要考慮:當(dāng)消費(fèi)者接收到消息后,還沒處理完業(yè)務(wù)邏輯,消費(fèi)者掛掉了,那消息也算丟失了?,比如用戶下單,訂單中心發(fā)送了1個(gè)消息到RabbitMQ里的隊(duì)列,積分中心收到這個(gè)消息,準(zhǔn)備給這個(gè)下單的用戶增加20積分,但積分還沒增加成功呢,積分中心自己掛掉了,導(dǎo)致數(shù)據(jù)出現(xiàn)問題。

建華網(wǎng)站建設(shè)公司創(chuàng)新互聯(lián)公司,建華網(wǎng)站設(shè)計(jì)制作,有大型網(wǎng)站制作公司豐富經(jīng)驗(yàn)。已為建華成百上千家提供企業(yè)網(wǎng)站建設(shè)服務(wù)。企業(yè)網(wǎng)站搭建\成都外貿(mào)網(wǎng)站建設(shè)公司要多少錢,請找那個(gè)售后服務(wù)好的建華做網(wǎng)站的公司定做!

那么如何解決這種問題呢?

為了保證消息被消費(fèi)者成功的消費(fèi),RabbitMQ提供了消息確認(rèn)機(jī)制(message acknowledgement),本文主要講解RabbitMQ中,如何使用消息確認(rèn)機(jī)制來保證消息被消費(fèi)者成功的消費(fèi),避免因?yàn)橄M(fèi)者突然宕機(jī)而引起的消息丟失。

2. 開啟顯式Ack模式

我們開啟一個(gè)消費(fèi)者的代碼是這樣的:

// 創(chuàng)建隊(duì)列消費(fèi)者
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope,
                               AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println("Received Message '" + message + "'");
    }
};
channel.basicConsume(QUEUE_NAME, true, consumer);

這里的重點(diǎn)是channel.basicConsume(QUEUE_NAME, true, consumer);方法的第2個(gè)參數(shù),讓我們先看下basicConsume()的源碼:

public String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException {
    return this.basicConsume(queue, autoAck, "", callback);
}

這里的autoAck參數(shù)指的是是否自動確認(rèn),如果設(shè)置為ture,RabbitMQ會自動把發(fā)送出去的消息置為確認(rèn),然后從內(nèi)存(或者磁盤)中刪除,而不管消費(fèi)者接收到消息是否處理成功;如果設(shè)置為false,RabbitMQ會等待消費(fèi)者顯式的回復(fù)確認(rèn)信號后才會從內(nèi)存(或者磁盤)中刪除。

建議將autoAck設(shè)置為false,這樣消費(fèi)者就有足夠的時(shí)間處理消息,不用擔(dān)心處理消息過程中消費(fèi)者宕機(jī)造成消息丟失。

此時(shí),隊(duì)列里的消息就分成了2個(gè)部分:

  1. 等待投遞給消費(fèi)者的消息(下圖中的Ready部分)
  2. 已經(jīng)投遞給消費(fèi)者,但是還沒有收到消費(fèi)者確認(rèn)信號的消息(下圖中的Unacked部分)

RabbitMQ如何保證隊(duì)列里的消息99.99%被消費(fèi)?

如果RabbitMQ一直沒有收到消費(fèi)者的確認(rèn)信號,并且消費(fèi)此消息的消費(fèi)者已經(jīng)斷開連接,則RabbitMQ會安排該消息重新進(jìn)入隊(duì)列,等待投遞給下一個(gè)消費(fèi)者,當(dāng)然也有可能還是原來的那個(gè)消費(fèi)者。

RabbitMQ不會為未確認(rèn)的消息設(shè)置過期時(shí)間,它判斷此消息是否需要重新投遞給消費(fèi)者的唯一依據(jù)是消費(fèi)該消息的消費(fèi)者連接是否已經(jīng)斷開,這么設(shè)計(jì)的原因是RabbitMQ允許消費(fèi)者消費(fèi)一條消息的時(shí)間可以很久很久。

為了便于理解,我們舉個(gè)具體的例子,生產(chǎn)者的話的我們延用上文中的DurableProducer:

package com.zwwhnly.springbootaction.rabbitmq.durable;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class DurableProducer {
    private final static String EXCHANGE_NAME = "durable-exchange";
    private final static String QUEUE_NAME = "durable-queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 創(chuàng)建連接
        ConnectionFactory factory = new ConnectionFactory();
        // 設(shè)置 RabbitMQ 的主機(jī)名
        factory.setHost("localhost");
        // 創(chuàng)建一個(gè)連接
        Connection connection = factory.newConnection();
        // 創(chuàng)建一個(gè)通道
        Channel channel = connection.createChannel();
        // 創(chuàng)建一個(gè)Exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

        // 發(fā)送消息
        String message = "durable exchange test";
        AMQP.BasicProperties props = new AMQP.BasicProperties().builder().deliveryMode(2).build();
        channel.basicPublish(EXCHANGE_NAME, "", props, message.getBytes());

        // 關(guān)閉頻道和連接
        channel.close();
        connection.close();
    }
}

然后新建一個(gè)消費(fèi)者AckConsumer類:

package com.zwwhnly.springbootaction.rabbitmq.ack;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class AckConsumer {
    private final static String QUEUE_NAME = "durable-queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 創(chuàng)建連接
        ConnectionFactory factory = new ConnectionFactory();
        // 設(shè)置 RabbitMQ 的主機(jī)名
        factory.setHost("localhost");
        // 創(chuàng)建一個(gè)連接
        Connection connection = factory.newConnection();
        // 創(chuàng)建一個(gè)通道
        Channel channel = connection.createChannel();
        // 創(chuàng)建隊(duì)列消費(fèi)者
        com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                int result = 1 / 0;
                System.out.println("Received Message '" + message + "'");
            }
        };
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

我們先將autoAck參數(shù)設(shè)置為ture,即自動確認(rèn),并在消費(fèi)消息時(shí)故意寫個(gè)異常,然后先運(yùn)行生產(chǎn)者客戶端將消息寫入隊(duì)列中,然后運(yùn)行消費(fèi)者客戶端,發(fā)現(xiàn)消息未消費(fèi)成功但是卻消失了:

RabbitMQ如何保證隊(duì)列里的消息99.99%被消費(fèi)?

RabbitMQ如何保證隊(duì)列里的消息99.99%被消費(fèi)?

然后我們將autoAck設(shè)置為false:

channel.basicConsume(QUEUE_NAME, false, consumer);

再次運(yùn)行生產(chǎn)者客戶端將消息寫入隊(duì)列中,然后運(yùn)行消費(fèi)者客戶端,此時(shí)雖然消費(fèi)者客戶端仍然代碼異常,但是消息仍然在隊(duì)列中:

RabbitMQ如何保證隊(duì)列里的消息99.99%被消費(fèi)?

然后我們刪除掉消費(fèi)者客戶端中的異常代碼,重新啟動消費(fèi)者客戶端,發(fā)現(xiàn)消息消費(fèi)成功了,但是消息一直未Ack:

RabbitMQ如何保證隊(duì)列里的消息99.99%被消費(fèi)?

RabbitMQ如何保證隊(duì)列里的消息99.99%被消費(fèi)?

手動停掉消費(fèi)者客戶端,發(fā)現(xiàn)消息又到了Ready狀態(tài),準(zhǔn)備重新投遞:

RabbitMQ如何保證隊(duì)列里的消息99.99%被消費(fèi)?

之所以消費(fèi)掉消息,卻一直還是Unacked狀態(tài),是因?yàn)槲覀儧]在代碼中添加顯式的Ack代碼:

String message = new String(body, "UTF-8");
//int result = 1 / 0;
System.out.println("Received Message '" + message + "'");

long deliveryTag = envelope.getDeliveryTag();
channel.basicAck(deliveryTag, false);

deliveryTag可以看做消息的編號,它是一個(gè)64位的長×××值。

此時(shí)運(yùn)行消費(fèi)者客戶端,發(fā)現(xiàn)消息消費(fèi)成功,并且在隊(duì)列中被移除:

RabbitMQ如何保證隊(duì)列里的消息99.99%被消費(fèi)?

RabbitMQ如何保證隊(duì)列里的消息99.99%被消費(fèi)?

文末彩蛋

Java學(xué)習(xí)、面試;文檔、視頻資源免費(fèi)獲取

RabbitMQ如何保證隊(duì)列里的消息99.99%被消費(fèi)?

創(chuàng)新互聯(lián)www.cdcxhl.cn,專業(yè)提供香港、美國云服務(wù)器,動態(tài)BGP最優(yōu)骨干路由自動選擇,持續(xù)穩(wěn)定高效的網(wǎng)絡(luò)助力業(yè)務(wù)部署。公司持有工信部辦法的idc、isp許可證, 機(jī)房獨(dú)有T級流量清洗系統(tǒng)配攻擊溯源,準(zhǔn)確進(jìn)行流量調(diào)度,確保服務(wù)器高可用性。佳節(jié)活動現(xiàn)已開啟,新人活動云服務(wù)器買多久送多久。

本文標(biāo)題:RabbitMQ如何保證隊(duì)列里的消息99.99%被消費(fèi)?-創(chuàng)新互聯(lián)
標(biāo)題路徑:http://bm7419.com/article44/dseiee.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供外貿(mào)建站、標(biāo)簽優(yōu)化、服務(wù)器托管、網(wǎng)站維護(hù)、網(wǎng)站策劃、微信小程序

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)

微信小程序開發(fā)