日本黄色一级经典视频|伊人久久精品视频|亚洲黄色色周成人视频九九九|av免费网址黄色小短片|黄色Av无码亚洲成年人|亚洲1区2区3区无码|真人黄片免费观看|无码一级小说欧美日免费三级|日韩中文字幕91在线看|精品久久久无码中文字幕边打电话

當(dāng)前位置:首頁(yè) > > 架構(gòu)師社區(qū)
[導(dǎo)讀]我們知道,消息在RabbitMQ的整個(gè)生命周期是生產(chǎn)者投遞消息到Exchange,Exchange根據(jù)路由鍵將消息路由到合適的Queue,Queue再將消息推(或消費(fèi)者主動(dòng)拉)給消費(fèi)者。

我們知道,消息在RabbitMQ的整個(gè)生命周期是生產(chǎn)者投遞消息ExchangeExchange根據(jù)路由鍵消息路由到合適的Queue,Queue再將消息推(或消費(fèi)者主動(dòng)拉)給消費(fèi)者。

在這個(gè)過(guò)程當(dāng)中,Exchange根據(jù)路由鍵將消息路由到合適的Queue的過(guò)程,可能發(fā)生諸如

  1. Exchange沒(méi)有任何Queue與其綁定,
  2. 或者根據(jù)消息的路由鍵,沒(méi)有任何一個(gè)合適的Queue來(lái)投遞消息,

從而導(dǎo)致消息路由失敗。對(duì)于這些路由失敗的消息應(yīng)該如何處理呢?有兩種方式:

  1. 將消息返回給投遞該條消息的生產(chǎn)者。
  2. 使用備份交換機(jī) alternate-exchange(AE)。

方式1:將消息返回給投遞該條消息的生產(chǎn)者

  • 配置
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=futao
spring.rabbitmq.password=123456789
spring.rabbitmq.virtual-host=/tech-sharing

# 當(dāng)exchange無(wú)法找到任何一個(gè)合適的queue時(shí),將消息return給生產(chǎn)者
spring.rabbitmq.template.mandatory=true
# 必須設(shè)置為true,否則消息消息路由失敗也無(wú)法觸發(fā)Return回調(diào)
spring.rabbitmq.publisher-returns=true
  • 交換機(jī)定義與消息發(fā)送
@Slf4j @Component public class NoMatchQueue { /**
     * 交換機(jī)名稱
     */ public static final String EXCHANGE_NAME = "X_NO_MATCH_QUEUE"; @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct public void send() {
        log.info("發(fā)送消息");
        Order order = new Order(1, BigDecimal.TEN, OrderStatusEnum.UN_PAY.getStatus());
        Message message = MessageBuilder
                .withBody(JSON.toJSONString(order).getBytes(StandardCharsets.UTF_8))
                .setContentEncoding(StandardCharsets.UTF_8.displayName())
                .setContentType(MessageProperties.CONTENT_TYPE_JSON)
                .build();
        rabbitTemplate.convertAndSend(EXCHANGE_NAME, "", message);
    }
} @Configuration class ExchangeDeclare { /**
     * 只定義一個(gè)交換機(jī),但是不綁定任何Queue,所以發(fā)送到該Exchange的消息都會(huì)路由失敗
     *
     * @return */ @Bean public Exchange noMatchQueueExchange() { return ExchangeBuilder
                .topicExchange(NoMatchQueue.EXCHANGE_NAME)
                .durable(true)
                .build();
    }
}
  • 設(shè)置回調(diào)函數(shù)
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() { @Override public void returnedMessage(ReturnedMessage returnedMessage) {
        log.error("消息被退回:{}", returnedMessage);
    }
});
  • 消息被退回:且可以看到原因是無(wú)法路由

RabbitMQ消息路由失敗的處理方案(回調(diào)與備份交換機(jī)AE)


方式2:使用備份交換機(jī)

使用方式1需要我們?cè)诔绦蛑羞M(jìn)行編碼設(shè)置回調(diào)函數(shù)監(jiān)聽(tīng),增加了生產(chǎn)者代碼的復(fù)雜性,那么為了消息不丟失還有沒(méi)有其他方式來(lái)處理路由失敗的消息呢:答案是使用備份交換機(jī)。

  • 相較于使用回調(diào)函數(shù),使用備份交換機(jī)只需要給交換機(jī)綁定一個(gè)備份交換機(jī)即可,當(dāng)消息路由失敗之后,消息將投遞到備份交換機(jī),再由備份交換機(jī)路由消息到備份隊(duì)列。這樣我們只需要關(guān)注這個(gè)備份隊(duì)列就能知道/獲取到路由失敗的消息。通常情況下備份交換的Type應(yīng)該設(shè)置為 fanout。
  • 配置
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=futao
spring.rabbitmq.password=123456789
spring.rabbitmq.virtual-host=/tech-sharing

# 當(dāng)exchange無(wú)法找到任何一個(gè)合適的queue時(shí),將消息return給生產(chǎn)者
spring.rabbitmq.template.mandatory=false
# 必須設(shè)置為true,否則消息消息路由失敗也無(wú)法觸發(fā)Return回調(diào)
spring.rabbitmq.publisher-returns=false
  • 注意: 使用備份交換機(jī)模式,mandatory將無(wú)效,即就算mandatory設(shè)置為false,路由失敗的消息同樣會(huì)被投遞到綁定的備份交換機(jī)。
  • 正常業(yè)務(wù)交換機(jī)(不綁定隊(duì)列,使得消息一定會(huì)路由失敗)
/**
 * 業(yè)務(wù)交換機(jī)
 *
 * @return */ @Bean public Exchange noMatchQueueExchange() { return ExchangeBuilder
            .topicExchange(NoMatchQueueAlternateExchange.EXCHANGE_NAME)
            .durable(true) // 綁定備份交換機(jī) .alternate(X_ALTERNATE)
            .build();
}
  • 備份交換機(jī)/隊(duì)列/綁定
/**
 * 備份隊(duì)列
 *
 * @return */ @Bean public Queue alternateQueue() { return QueueBuilder
            .durable("Q_ALTERNATE")
            .build();
} /**
 * 備份交換機(jī)
 *
 * @return */ @Bean public Exchange alternateExchange() { return ExchangeBuilder
            .fanoutExchange(X_ALTERNATE)
            .durable(true)
            .build();
} /**
 * 備份綁定
 *
 * @param alternateExchange
 * @param alternateQueue
 * @return */ @Bean public Binding alternateBinding(Exchange alternateExchange, Queue alternateQueue) { return BindingBuilder
            .bind(alternateQueue)
            .to(alternateExchange)
            .with("")
            .noargs();
}
  • 消息投遞
/**
 * 正常業(yè)務(wù)交換機(jī)
 */ public static final String EXCHANGE_NAME = "X_NO_MATCH_QUEUE_ALTERNATE"; @Autowired private RabbitTemplate rabbitTemplate; /**
 * 發(fā)送消息
 */ @PostConstruct public void send() {
    log.info("發(fā)送消息");
    Order order = new Order(1, BigDecimal.TEN, OrderStatusEnum.UN_PAY.getStatus());
    Message message = MessageBuilder
            .withBody(JSON.toJSONString(order).getBytes(StandardCharsets.UTF_8))
            .setContentEncoding(StandardCharsets.UTF_8.displayName())
            .setContentType(MessageProperties.CONTENT_TYPE_JSON)
            .build();
    rabbitTemplate.convertAndSend(EXCHANGE_NAME, "", message);
}
  • 結(jié)果是消息被路由到備份交換機(jī)的備份隊(duì)列

RabbitMQ消息路由失敗的處理方案(回調(diào)與備份交換機(jī)AE)

  • 且: 如果你同時(shí)使用了兩種方式,即(mandatory為true+Listener監(jiān)聽(tīng))和(備份交換機(jī)AlternateExchange),消息將只會(huì)路由到備份交換機(jī),不會(huì)Return回生產(chǎn)者。


# 在原生RabbitMQ-client中演示這一過(guò)程:
@Slf4j public class AeTest { /**
     * 獲取Channel
     */ private static final Channel CHANNEL = MqChannelUtils.getChannel(); /**
     * 備份交換機(jī)
     */ private static final String X_AE = "X_AE"; /**
     * 備份交換機(jī)綁定的隊(duì)列
     */ private static final String Q_AE = "Q_AE"; /**
     * 正常業(yè)務(wù)的交換機(jī)
     */ private static final String X_1 = "X_1"; public static void main(String[] args) throws IOException { // 定義備份交換機(jī)-其實(shí)也是一個(gè)正常的交換機(jī) CHANNEL.exchangeDeclare(X_AE, BuiltinExchangeType.FANOUT, true); // 定義備份隊(duì)列 CHANNEL.queueDeclare(Q_AE, true, false, false, null); // 綁定備份 CHANNEL.queueBind(Q_AE, X_AE, "");

        HashMap arguments = new HashMap<>(); // 綁定的備份交換機(jī) arguments.put("alternate-exchange", X_AE); // 定義交換機(jī) CHANNEL.exchangeDeclare(X_1, BuiltinExchangeType.TOPIC, false, false, arguments); // 添加監(jiān)聽(tīng)器,看看是否還會(huì)return消息 CHANNEL.addReturnListener(new ReturnCallback() { @Override public void handle(Return returnMessage) {
                log.error("消息被退回{}", returnMessage);
            }
        }); // 嘗試向交換機(jī)發(fā)送消息(無(wú)法路由)- mandatory參數(shù)無(wú)效 CHANNEL.basicPublish(X_1, "", false, false, new AMQP.BasicProperties(), "阿依古麗".getBytes(StandardCharsets.UTF_8));
    }
}
  • 兩個(gè)交換機(jī),正常的交換機(jī)X_1和備份交換機(jī)X_AE

RabbitMQ消息路由失敗的處理方案(回調(diào)與備份交換機(jī)AE)

RabbitMQ消息路由失敗的處理方案(回調(diào)與備份交換機(jī)AE)

RabbitMQ消息路由失敗的處理方案(回調(diào)與備份交換機(jī)AE)

  • 備份交換機(jī)綁定的隊(duì)列已經(jīng)接收到了路由失敗的消息

RabbitMQ消息路由失敗的處理方案(回調(diào)與備份交換機(jī)AE)

  • 其他要注意的點(diǎn):

    • 備份交換機(jī)的Type設(shè)置為fanout比較合適,這樣可以忽略RoutingKey,避免備份交換機(jī)又路由失敗。
    • 被投遞到備份交換機(jī)的RoutingKey為消息投遞到MQ時(shí)的原始RoutingKey,不會(huì)變,這一點(diǎn)在其他場(chǎng)景下也是一樣的。
    • 使用備份交換機(jī)模式,mandatory將無(wú)效,即就算mandatory設(shè)置為false,路由失敗的消息同樣會(huì)被投遞到綁定的備份交換機(jī)。

# 源代碼

https://gitee.com/FutaoSmile/tech-sharing-mq


免責(zé)聲明:本文內(nèi)容由21ic獲得授權(quán)后發(fā)布,版權(quán)歸原作者所有,本平臺(tái)僅提供信息存儲(chǔ)服務(wù)。文章僅代表作者個(gè)人觀點(diǎn),不代表本平臺(tái)立場(chǎng),如有問(wèn)題,請(qǐng)聯(lián)系我們,謝謝!

本站聲明: 本文章由作者或相關(guān)機(jī)構(gòu)授權(quán)發(fā)布,目的在于傳遞更多信息,并不代表本站贊同其觀點(diǎn),本站亦不保證或承諾內(nèi)容真實(shí)性等。需要轉(zhuǎn)載請(qǐng)聯(lián)系該專欄作者,如若文章內(nèi)容侵犯您的權(quán)益,請(qǐng)及時(shí)聯(lián)系本站刪除( 郵箱:macysun@21ic.com )。
換一批
延伸閱讀
關(guān)閉