如何利用MQ實現(xiàn)事務(wù)補償

本篇內(nèi)容介紹了“如何利用MQ實現(xiàn)事務(wù)補償”的有關(guān)知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!

從策劃到設(shè)計制作,每一步都追求做到細(xì)膩,制作可持續(xù)發(fā)展的企業(yè)網(wǎng)站。為客戶提供成都網(wǎng)站建設(shè)、做網(wǎng)站、網(wǎng)站策劃、網(wǎng)頁設(shè)計、域名注冊、網(wǎng)頁空間、網(wǎng)絡(luò)營銷、VI設(shè)計、 網(wǎng)站改版、漏洞修補等服務(wù)。為客戶提供更好的一站式互聯(lián)網(wǎng)解決方案,以客戶的口碑塑造優(yōu)易品牌,攜手廣大客戶,共同發(fā)展進(jìn)步。

rabbitMQ 在互聯(lián)網(wǎng)公司有著大規(guī)模應(yīng)用,本篇將實戰(zhàn)介紹 springboot 整合 rabbitMQ,同時也將在具體的業(yè)務(wù)場景中介紹利用 MQ  實現(xiàn)事務(wù)補償操作。

一、介紹

本篇我們一起來實操一下SpringBoot整合rabbitMQ,為后續(xù)業(yè)務(wù)處理做鋪墊。

廢話不多說,直奔主題!

二、整合實戰(zhàn)

2.1、創(chuàng)建一個 maven 工程,引入 amqp 包

<!--amqp 支持--> <dependency>     <groupId>org.springframework.boot</groupId>     <artifactId>spring-boot-starter-amqp</artifactId> </dependency>

2.2、在全局文件中配置 rabbitMQ 服務(wù)信息

spring.rabbitmq.addresses=197.168.24.206:5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=/

其中,spring.rabbitmq.addresses參數(shù)值為 rabbitmq 服務(wù)器地址

2.3、編寫 rabbitmq 配置類

@Slf4j @Configuration public class RabbitConfig {      /**      * 初始化連接工廠      * @param addresses      * @param userName      * @param password      * @param vhost      * @return      */     @Bean     ConnectionFactory connectionFactory(@Value("${spring.rabbitmq.addresses}") String addresses,                                         @Value("${spring.rabbitmq.username}") String userName,                                         @Value("${spring.rabbitmq.password}") String password,                                         @Value("${spring.rabbitmq.virtual-host}") String vhost) {         CachingConnectionFactory connectionFactory = new CachingConnectionFactory();         connectionFactory.setAddresses(addresses);         connectionFactory.setUsername(userName);         connectionFactory.setPassword(password);         connectionFactory.setVirtualHost(vhost);         return connectionFactory;     }      /**      * 重新實例化 RabbitAdmin 操作類      * @param connectionFactory      * @return      */     @Bean     public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){         return new RabbitAdmin(connectionFactory);     }      /**      * 重新實例化 RabbitTemplate 操作類      * @param connectionFactory      * @return      */     @Bean     public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){         RabbitTemplate rabbitTemplate=new RabbitTemplate(connectionFactory);         //數(shù)據(jù)轉(zhuǎn)換為json存入消息隊列         rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());         return rabbitTemplate;     }      /**      * 將 RabbitUtil 操作工具類加入IOC容器      * @return      */     @Bean     public RabbitUtil rabbitUtil(){         return new RabbitUtil();     }  }

2.4、編寫 RabbitUtil 工具類

public class RabbitUtil {      private static final Logger logger = LoggerFactory.getLogger(RabbitUtil.class);      @Autowired     private RabbitAdmin rabbitAdmin;      @Autowired     private RabbitTemplate rabbitTemplate;      /**      * 創(chuàng)建Exchange      * @param exchangeName      */     public void addExchange(String exchangeType, String exchangeName){         Exchange exchange = createExchange(exchangeType, exchangeName);         rabbitAdmin.declareExchange(exchange);     }      /**      * 刪除一個Exchange      * @param exchangeName      */     public boolean deleteExchange(String exchangeName){         return rabbitAdmin.deleteExchange(exchangeName);     }      /**      * 創(chuàng)建一個指定的Queue      * @param queueName      * @return queueName      */     public void addQueue(String queueName){         Queue queue = createQueue(queueName);         rabbitAdmin.declareQueue(queue);     }      /**      * 刪除一個queue      * @return queueName      * @param queueName      */     public boolean deleteQueue(String queueName){         return rabbitAdmin.deleteQueue(queueName);     }      /**      * 按照篩選條件,刪除隊列      * @param queueName      * @param unused 是否被使用      * @param empty 內(nèi)容是否為空      */     public void deleteQueue(String queueName, boolean unused, boolean empty){         rabbitAdmin.deleteQueue(queueName,unused,empty);     }      /**      * 清空某個隊列中的消息,注意,清空的消息并沒有被消費      * @return queueName      * @param queueName      */     public void purgeQueue(String queueName){         rabbitAdmin.purgeQueue(queueName, false);     }      /**      * 判斷指定的隊列是否存在      * @param queueName      * @return      */     public boolean existQueue(String queueName){         return rabbitAdmin.getQueueProperties(queueName) == null ? false : true;     }      /**      * 綁定一個隊列到一個匹配型交換器使用一個routingKey      * @param exchangeType      * @param exchangeName      * @param queueName      * @param routingKey      * @param isWhereAll      * @param headers EADERS模式類型設(shè)置,其他模式類型傳空      */     public void addBinding(String exchangeType, String exchangeName, String queueName, String routingKey, boolean isWhereAll, Map<String, Object> headers){         Binding binding = bindingBuilder(exchangeType, exchangeName, queueName, routingKey, isWhereAll, headers);         rabbitAdmin.declareBinding(binding);     }      /**      * 聲明綁定      * @param binding      */     public void addBinding(Binding binding){         rabbitAdmin.declareBinding(binding);     }      /**      * 解除交換器與隊列的綁定      * @param exchangeType      * @param exchangeName      * @param queueName      * @param routingKey      * @param isWhereAll      * @param headers      */     public void removeBinding(String exchangeType, String exchangeName, String queueName, String routingKey, boolean isWhereAll, Map<String, Object> headers){         Binding binding = bindingBuilder(exchangeType, exchangeName, queueName, routingKey, isWhereAll, headers);         removeBinding(binding);     }      /**      * 解除交換器與隊列的綁定      * @param binding      */     public void removeBinding(Binding binding){         rabbitAdmin.removeBinding(binding);     }      /**      * 創(chuàng)建一個交換器、隊列,并綁定隊列      * @param exchangeType      * @param exchangeName      * @param queueName      * @param routingKey      * @param isWhereAll      * @param headers      */     public void andExchangeBindingQueue(String exchangeType, String exchangeName, String queueName, String routingKey, boolean isWhereAll, Map<String, Object> headers){         //聲明交換器         addExchange(exchangeType, exchangeName);         //聲明隊列         addQueue(queueName);         //聲明綁定關(guān)系         addBinding(exchangeType, exchangeName, queueName, routingKey, isWhereAll, headers);     }      /**      * 發(fā)送消息      * @param exchange      * @param routingKey      * @param object      */     public void convertAndSend(String exchange, String routingKey, final Object object){         rabbitTemplate.convertAndSend(exchange, routingKey, object);     }      /**      * 轉(zhuǎn)換Message對象      * @param messageType      * @param msg      * @return      */     public Message getMessage(String messageType, Object msg){         MessageProperties messageProperties = new MessageProperties();         messageProperties.setContentType(messageType);         Message message = new Message(msg.toString().getBytes(),messageProperties);         return message;     }      /**      * 聲明交換機(jī)      * @param exchangeType      * @param exchangeName      * @return      */     private Exchange createExchange(String exchangeType, String exchangeName){         if(ExchangeType.DIRECT.equals(exchangeType)){             return new DirectExchange(exchangeName);         }         if(ExchangeType.TOPIC.equals(exchangeType)){             return new TopicExchange(exchangeName);         }         if(ExchangeType.HEADERS.equals(exchangeType)){             return new HeadersExchange(exchangeName);         }         if(ExchangeType.FANOUT.equals(exchangeType)){             return new FanoutExchange(exchangeName);         }         return null;     }      /**      * 聲明綁定關(guān)系      * @param exchangeType      * @param exchangeName      * @param queueName      * @param routingKey      * @param isWhereAll      * @param headers      * @return      */     private Binding bindingBuilder(String exchangeType, String exchangeName, String queueName, String routingKey, boolean isWhereAll, Map<String, Object> headers){         if(ExchangeType.DIRECT.equals(exchangeType)){             return BindingBuilder.bind(new Queue(queueName)).to(new DirectExchange(exchangeName)).with(routingKey);         }         if(ExchangeType.TOPIC.equals(exchangeType)){             return BindingBuilder.bind(new Queue(queueName)).to(new TopicExchange(exchangeName)).with(routingKey);         }         if(ExchangeType.HEADERS.equals(exchangeType)){             if(isWhereAll){                 return BindingBuilder.bind(new Queue(queueName)).to(new HeadersExchange(exchangeName)).whereAll(headers).match();             }else{                 return BindingBuilder.bind(new Queue(queueName)).to(new HeadersExchange(exchangeName)).whereAny(headers).match();             }         }         if(ExchangeType.FANOUT.equals(exchangeType)){             return BindingBuilder.bind(new Queue(queueName)).to(new FanoutExchange(exchangeName));         }         return null;     }      /**      * 聲明隊列      * @param queueName      * @return      */     private Queue createQueue(String queueName){         return new Queue(queueName);     }       /**      * 交換器類型      */     public final static class ExchangeType {          /**          * 直連交換機(jī)(全文匹配)          */         public final static String DIRECT = "DIRECT";          /**          * 通配符交換機(jī)(兩種通配符:*只能匹配一個單詞,#可以匹配零個或多個)          */         public final static String TOPIC = "TOPIC";          /**          * 頭交換機(jī)(自定義鍵值對匹配,根據(jù)發(fā)送消息內(nèi)容中的headers屬性進(jìn)行匹配)          */         public final static String HEADERS = "HEADERS";          /**          * 扇形(廣播)交換機(jī) (將消息轉(zhuǎn)發(fā)到所有與該交互機(jī)綁定的隊列上)          */         public final static String FANOUT = "FANOUT";     } }

此致, rabbitMQ 核心操作功能操作已經(jīng)開發(fā)完畢!

2.5、編寫隊列監(jiān)聽類(靜態(tài))

@Slf4j @Configuration public class DirectConsumeListener {      /**      * 監(jiān)聽指定隊列,名稱:mq.direct.1      * @param message      * @param channel      * @throws IOException      */     @RabbitListener(queues = "mq.direct.1")     public void consume(Message message, Channel channel) throws IOException {         log.info("DirectConsumeListener,收到消息: {}", message.toString());     } }

如果你需要監(jiān)聽指定的隊列,只需要方法上加上@RabbitListener(queues = "")即可,同時填寫對應(yīng)的隊列名稱。

但是,如果你想動態(tài)監(jiān)聽隊列,而不是通過寫死在方法上呢?

請看下面介紹!

2.6、編寫隊列監(jiān)聽類(動態(tài))

重新實例化一個SimpleMessageListenerContainer對象,這個對象就是監(jiān)聽容器。

@Slf4j @Configuration public class DynamicConsumeListener {      /**      * 使用SimpleMessageListenerContainer實現(xiàn)動態(tài)監(jiān)聽      * @param connectionFactory      * @return      */     @Bean     public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){         SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);         container.setMessageListener((MessageListener) message -> {             log.info("ConsumerMessageListen,收到消息: {}", message.toString());         });         return container;     } }

如果想向SimpleMessageListenerContainer添加監(jiān)聽隊列或者移除隊列,只需通過如下方式即可操作。

@Slf4j @RestController @RequestMapping("/consumer") public class ConsumerController {      @Autowired     private SimpleMessageListenerContainer container;      @Autowired     private RabbitUtil rabbitUtil;      /**      * 添加隊列到監(jiān)聽器      * @param consumerInfo      */     @PostMapping("addQueue")     public void addQueue(@RequestBody ConsumerInfo consumerInfo) {         boolean existQueue = rabbitUtil.existQueue(consumerInfo.getQueueName());         if(!existQueue){             throw new CommonExecption("當(dāng)前隊列不存在");         }         //消費mq消息的類         container.addQueueNames(consumerInfo.getQueueName());         //打印監(jiān)聽容器中正在監(jiān)聽到隊列         log.info("container-queue:{}", JsonUtils.toJson(container.getQueueNames()));     }      /**      * 移除正在監(jiān)聽的隊列      * @param consumerInfo      */     @PostMapping("removeQueue")     public void removeQueue(@RequestBody ConsumerInfo consumerInfo) {         //消費mq消息的類         container.removeQueueNames(consumerInfo.getQueueName());         //打印監(jiān)聽容器中正在監(jiān)聽到隊列         log.info("container-queue:{}", JsonUtils.toJson(container.getQueueNames()));     }      /**      * 查詢監(jiān)聽容器中正在監(jiān)聽到隊列      */     @PostMapping("queryListenerQueue")     public void queryListenerQueue() {         log.info("container-queue:{}", JsonUtils.toJson(container.getQueueNames()));     } }

2.7、發(fā)送消息到交換器

發(fā)送消息到交換器,非常簡單,只需要通過如下方式即可!

  • 先編寫一個請求參數(shù)實體類

@Data public class ProduceInfo implements Serializable {      private static final long serialVersionUID = 1l;      /**      * 交換器名稱      */     private String exchangeName;      /**      * 路由鍵key      */     private String routingKey;      /**      * 消息內(nèi)容      */     public String msg; }
  • 編寫接口api

@RestController @RequestMapping("/produce") public class ProduceController {      @Autowired     private RabbitUtil rabbitUtil;      /**      * 發(fā)送消息到交換器      * @param produceInfo      */     @PostMapping("sendMessage")     public void sendMessage(@RequestBody ProduceInfo produceInfo) {         rabbitUtil.convertAndSend(produceInfo.getExchangeName(), produceInfo.getRoutingKey(), produceInfo);     }  }

當(dāng)然,你也可以直接使用rabbitTemplate操作類,來實現(xiàn)發(fā)送消息。

rabbitTemplate.convertAndSend(exchange, routingKey, message);

參數(shù)內(nèi)容解釋:

  • exchange:表示交換器名稱

  • routingKey:表示路由鍵key

  • message:表示消息

2.8、交換器、隊列維護(hù)操作

如果想通過接口對 rabbitMQ 中的交換器、隊列以及綁定關(guān)系進(jìn)行維護(hù),通過如下方式接口操作,即可實現(xiàn)!

先編寫一個請求參數(shù)實體類

@Data public class QueueConfig implements Serializable{      private static final long serialVersionUID = 1l;      /**      * 交換器類型      */     private String exchangeType;      /**      * 交換器名稱      */     private String exchangeName;      /**      * 隊列名稱      */     private String queueName;      /**      * 路由鍵key      */     private String routingKey; }

編寫接口api

/**  * rabbitMQ管理操作控制層  */ @RestController @RequestMapping("/config") public class RabbitController {       @Autowired     private RabbitUtil rabbitUtil;      /**      * 創(chuàng)建交換器      * @param config      */     @PostMapping("addExchange")     public void addExchange(@RequestBody QueueConfig config) {         rabbitUtil.addExchange(config.getExchangeType(), config.getExchangeName());     }      /**      * 刪除交換器      * @param config      */     @PostMapping("deleteExchange")     public void deleteExchange(@RequestBody QueueConfig config) {         rabbitUtil.deleteExchange(config.getExchangeName());     }      /**      * 添加隊列      * @param config      */     @PostMapping("addQueue")     public void addQueue(@RequestBody QueueConfig config) {         rabbitUtil.addQueue(config.getQueueName());     }      /**      * 刪除隊列      * @param config      */     @PostMapping("deleteQueue")     public void deleteQueue(@RequestBody QueueConfig config) {         rabbitUtil.deleteQueue(config.getQueueName());     }      /**      * 清空隊列數(shù)據(jù)      * @param config      */     @PostMapping("purgeQueue")     public void purgeQueue(@RequestBody QueueConfig config) {         rabbitUtil.purgeQueue(config.getQueueName());     }      /**      * 添加綁定      * @param config      */     @PostMapping("addBinding")     public void addBinding(@RequestBody QueueConfig config) {         rabbitUtil.addBinding(config.getExchangeType(), config.getExchangeName(), config.getQueueName(), config.getRoutingKey(), false, null);     }      /**      * 解除綁定      * @param config      */     @PostMapping("removeBinding")     public void removeBinding(@RequestBody QueueConfig config) {         rabbitUtil.removeBinding(config.getExchangeType(), config.getExchangeName(), config.getQueueName(), config.getRoutingKey(), false, null);     }      /**      * 創(chuàng)建頭部類型的交換器      * 判斷條件是所有的鍵值對都匹配成功才發(fā)送到隊列      * @param config      */     @PostMapping("andExchangeBindingQueueOfHeaderAll")     public void andExchangeBindingQueueOfHeaderAll(@RequestBody QueueConfig config) {         HashMap<String, Object> header = new HashMap<>();         header.put("queue", "queue");         header.put("bindType", "whereAll");         rabbitUtil.andExchangeBindingQueue(RabbitUtil.ExchangeType.HEADERS, config.getExchangeName(), config.getQueueName(), null, true, header);     }      /**      * 創(chuàng)建頭部類型的交換器      * 判斷條件是只要有一個鍵值對匹配成功就發(fā)送到隊列      * @param config      */     @PostMapping("andExchangeBindingQueueOfHeaderAny")     public void andExchangeBindingQueueOfHeaderAny(@RequestBody QueueConfig config) {         HashMap<String, Object> header = new HashMap<>();         header.put("queue", "queue");         header.put("bindType", "whereAny");         rabbitUtil.andExchangeBindingQueue(RabbitUtil.ExchangeType.HEADERS, config.getExchangeName(), config.getQueueName(), null, false, header);     } }

至此,rabbitMQ 管理器基本的 crud 全部開發(fā)完成!

三、利用 MQ 實現(xiàn)事務(wù)補償

當(dāng)然,我們花了這么大的力氣,絕不僅僅是為了將 rabbitMQ 通過 web  項目將其管理起來,最重要的是能投入業(yè)務(wù)使用中去!

上面的操作只是告訴我們怎么使用 rabbitMQ!

  • 當(dāng)你仔細(xì)回想整個過程的時候,其實還是回到最初那個問題,什么時候使用 MQ ?

以常見的訂單系統(tǒng)為例,用戶點擊【下單】按鈕之后的業(yè)務(wù)邏輯可能包括:支付訂單、扣減庫存、生成相應(yīng)單據(jù)、發(fā)紅包、發(fā)短信通知等等。

在業(yè)務(wù)發(fā)展初期這些邏輯可能放在一起同步執(zhí)行,隨著業(yè)務(wù)的發(fā)展訂單量增長,需要提升系統(tǒng)服務(wù)的性能,這時可以將一些不需要立即生效的操作拆分出來異步執(zhí)行,比如發(fā)放紅包、發(fā)短信通知等。這種場景下就可以用  MQ ,在下單的主流程(比如扣減庫存、生成相應(yīng)單據(jù))完成之后發(fā)送一條消息到 MQ 讓主流程快速完結(jié),而由另外的單獨線程拉取 MQ 的消息(或者由 MQ  推送消息),當(dāng)發(fā)現(xiàn) MQ 中有發(fā)紅包或發(fā)短信之類的消息時,執(zhí)行相應(yīng)的業(yè)務(wù)邏輯。

這種是利用 MQ 實現(xiàn)業(yè)務(wù)解耦,其它的場景包括最終一致性、廣播、錯峰流控等等。

利用 MQ 實現(xiàn)業(yè)務(wù)解耦的過程其實也很簡單。

  • 當(dāng)主流程結(jié)束之后,將消息推送到發(fā)紅包、發(fā)短信交換器中即可

@Service public class OrderService {      @Autowired     private RabbitUtil rabbitUtil;      /**      * 創(chuàng)建訂單      * @param order      */     @Transactional     public void createOrder(Order order){         //1、創(chuàng)建訂單         //2、調(diào)用庫存接口,減庫存         //3、向客戶發(fā)放紅包         rabbitUtil.convertAndSend("exchange.send.bonus", null, order);         //4、發(fā)短信通知         rabbitUtil.convertAndSend("exchange.sms.message", null, order);     }  }
  • 監(jiān)聽發(fā)紅包操作

/**  * 監(jiān)聽發(fā)紅包  * @param message  * @param channel  * @throws IOException  */ @RabbitListener(queues = "exchange.send.bonus") public void consume(Message message, Channel channel) throws IOException {     String msgJson = new String(message.getBody(),"UTF-8");     log.info("收到消息: {}", message.toString());      //調(diào)用發(fā)紅包接口 }

監(jiān)聽發(fā)短信操作

/**  * 監(jiān)聽發(fā)短信  * @param message  * @param channel  * @throws IOException  */ @RabbitListener(queues = "exchange.sms.message") public void consume(Message message, Channel channel) throws IOException {     String msgJson = new String(message.getBody(),"UTF-8");     log.info("收到消息: {}", message.toString());      //調(diào)用發(fā)短信接口 }

既然 MQ 這么好用,那是不是完全可以將以前的業(yè)務(wù)也按照整個模型進(jìn)行拆分呢?

答案顯然不是!

當(dāng)引入 MQ 之后業(yè)務(wù)的確是解耦了,但是當(dāng) MQ 一旦掛了,所有的服務(wù)基本都掛了,是不是很可怕!

但是沒關(guān)系,俗話說,兵來將擋、水來土掩,這句話同樣適用于 IT 開發(fā)者,有坑填坑!

“如何利用MQ實現(xiàn)事務(wù)補償”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識可以關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實用文章!

本文名稱:如何利用MQ實現(xiàn)事務(wù)補償
網(wǎng)頁地址:http://bm7419.com/article24/igoeje.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供移動網(wǎng)站建設(shè)、企業(yè)建站、響應(yīng)式網(wǎng)站、面包屑導(dǎo)航、定制網(wǎng)站手機(jī)網(wǎng)站建設(shè)

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

成都網(wǎng)頁設(shè)計公司