MQ的一些常见问题

  • 消息可靠性问题:如何确保发送的消息至少被消费一次
  • 延迟消息问题:如何实现消息的延迟投递
  • 消息堆积问题:如何解决数百万消息堆积,无法及时消费的问题
  • 高可用问题:如何避免单点的MQ故障而导致的不可用问题

一、消息可靠投递:怎么确保消息不丢失?

想象你寄快递:如果快递丢了,你肯定不乐意。RabbitMQ 的消息也一样,从发送到消费,任何环节都可能丢消息。我们需要一套 “全链路保障” 机制。

消息从生产者发送到exchange,再到queue,再到消费者,有哪些导致消息丢失的可能性?

  • 发送时丢失
    • 生产者发送的消息未送达exchange
    • 消息到达exchange后未到达queue
  • MQ宕机, queue将消息丢失
  • consumer接收到消息后未消费就宕机

消息丢失风险全景图
在这里插入图片描述

RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。

  • confirm 确认模式,消息从producer到exchange则会返回一个confirmCallback
  • return 退回模式,消息从exchange–> queue投递失败则会返回一个returnCallback。我们将利用这两个callback控制消息的可靠性投递

可靠性保障四重防护

1.1 生产者消息确认

生产者确认机制
RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。结果有两种请求:

  • publisher-confirm,发送者确认
    ➢消息成功投递到交换机,返回ack
    ➢消息未投递到交换机,返回nack
  • publisher-return,发送者回执
    ➢消息投递到交换机了,但是没有路由到队列。返回ACK,及路由失败原因。调用ReturnCallback

tip:确认机制发送消息时,需要给每个消息设置一个全局唯一id,以区分不同消息,避免ack冲突
在这里插入图片描述

SpringAMQP实现生产者确认

其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应message从RabbitMQ的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收;如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息。
1. 生产者确认机制:在application.yml开启确认

spring:
  rabbitmq:
    publisher-confirm-type: correlated # 开启确认模式
    publisher-returns: true # 开启回退模式
    template:
      mandary: true

配置说明:

  • publish-confirm-type: 开启publisher-confirm,这里支持两种类型:
    • simple: 同步等待confirm结果,直到超时
    • correlated: 异步回调,定义ConfirmCallback, MQ返回结果时会回调这个ConfirmCallback
  • publish-returns: 开启publish-return功能,同样是基于callback机制, 不过是定义ReturnCallback
  • template.mandatory:定义消息路由失败时的策略。true, 则调用ReturnCallback; false: 则直接丢弃消息

每个RabbitTemplate只能配置一个ReturnCallback, 因此需要在项目启动过程中配置:

@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {

    @0verride
    public void setApplicationContext (ApplicationContext applicationContext) throws BeansException {
	  //获取RabbitTemplate
	  RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
	  //设置ReturnCallback, 退回回调:消息到了交换机但没到队列
	  rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
	  log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}",
	  replyCode, replyText, exchange, routingKey, message.toString()) ;
   });
   
     // 确认回调:消息是否到交换机
    rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
	    if (ack) {
	        System.out.println("消息已到交换机");
	    } else {
	        System.out.println("消息没到交换机,原因:" + cause);
	        // 重试发送
	    }
  });

}

方式二:

@Slf4j
@Configuration
@RequiredArgsConstructor
public class MqConfig {
	private final RabbitTemplate rabbitTemplate;
	
	@PostConstruct
	public void init(){
		rabbitTemplate.setReturnsCallback(returned -> {
			log.error ("监听到了消息return callback") ;
			log.debug("exchange: {}", returned. getExchange());
			log.debug("routingKey: {}", returned . getRoutingKey());
			log.debug("message: {}", returned . getMessage());
			log.debug("replyCode: {}", returned . getReplyCode());
			log.debug("replyText: {}", returned. getReplyText());
	});
}

1.2 消息持久化

数据持久化
mq中的交换机、队列、消息默认都是持久化的。消息的持久化是指先把消息写入内存,当内存中数据较多时才会考虑写入磁盘。

怎么做?

  • 交换机持久化:@Bean public Exchange exchange() { return new DirectExchange(“ex”, true, false); }
  • 队列持久化:@Bean public Queue queue() { return new Queue(“q”, true); }
  • 消息持久化MessageProperties.PERSISTENT_TEXT_PLAIN
    在这里插入图片描述

Lazy Queue

从RabbitMQ的3.6.0版本开始,就增加了Lazy Queue的概念,也就是惰性队列
惰性队列的特征如下:

  • 接收到消息后直接存入磁盘,不再存储到内存
  • 消费者要消费消息时才会从磁盘中读取并加载到内存(可以提前缓存部分消息到内存,最多2048条)
    在3.12版本后,所有队列都是Lazy Queue模式,无法更改。
    设置方式
    1. 控制台方式
    在这里插入图片描述
    2. 代码方式
    在这里插入图片描述

消息接收效果
在这里插入图片描述

1.3 消费者消息确认

消费者确认
RabbitMQ支持消费者确认机制,即:消费者处理消息后可以向MQ发送ack回执,MQ收到ack回执后才会删除该消息。
三种回执状态:

  • ack:成功处理消息,RabbitMQ从队列中删除该消息
  • nack:消息处理失败,RabbitMQ需要再次投递消息
  • reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息

而SpringAMQP则允许配置三种确认模式:
配置方式是修改application.yml文件,添加下面配置:

spring:
  rabbitmq:
      listener:  #消费者
          simple:
             prefetch: 1 #每次只能获取一条信息,处理完成才能获取下一个信息(默认是无限个
             acknowledge-mode: none #none,关闭ack;manual,手动ack;auto:自动ack
  • auto(自动,推荐):Spring 判断(spring 监测listener代码是否出现异常),处理成功→ack(MQ 删除消息);处理失败→nack(MQ 重试)。
  • manual(手动):自己调用channel.basicAck()确认,适合复杂业务。
  • none(关闭):MQ 认为收到就成功,可能丢消息,不推荐。

若是manual,需要在消费者代码中手动应答签收消息

@RabbitListener(queues = "queue")
public void listen(String object, Message message, Channel channel) {
    long deliveryTag = message.getMessageProperties().getDeliveryTag();
    log.info("消费成功:{},消息内容:{}", deliveryTag, object);
    try {
        /**
         * 执行业务代码...
         * */
        channel.basicAck(deliveryTag, false);
    } catch (IOException e) {
        log.error("签收失败", e);
        try {
            //第三个参数:是否重回队列
            channel.basicNack(deliveryTag, false, true);
        } catch (IOException exception) {
            log.error("拒签失败", exception);
        }
    }
}

注意:若生产环境使用以上代码,一旦发生一次消息报错就会崩溃。因为basicNack方法的第三个参数代表是否重回队列,如果false消息会直接丢弃,相当于消息可靠性没有保障;若果是true,当发生消息报错后,这个消息会重回消息队列顶端,继续推送到消费端,通常代码的报错并不会因为重试就能解决,所以这个消息将会出现这种情况:继续被消费,继续报错,重回队列,继续被消费。。。。死循环

真实场景一般三种选择

  1. 当消费失败后将此消息存到Redis,记录消费次数,如果消费了三次还是失败,就丢弃此消息,记录日志落库保存
  2. 直接填false,不重回队列,记录日志、发送邮件等待开发手动处理
  3. 不启用手动ack,使用springboot提供的消息重试

1.3.1 消费失败重试机制

有时候会因为网络波动,出现消费者接连MQ失败的情况。
当消费者出现异常后,消息会不断requeue (重新入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,无限循环,导致mq的消息处理飙升,带来不必要的压力:
我们可以利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列

spring:
   rabbitmq:
    listener:
      simple:
        prefetch: 1
        acknowledge-mode: none
        retry: #消费者失败重试机制
          enabled: true #开启消费者失败重试
          initial-interval: 1000 #初始失败等待时长为1s
          multiplier: 3 #下次失败的等待时长倍数,下次等待时长=initial-interval*multiplier
          max-attempts: 3 #最大重试次数
          stateless: true【默认值】 #消息监听器是否启用无状态重试;true无状态,false有状态。

消费者代码

@RabbitListener(queues = "queue")
public void listen(String object, Message message, Channel channel) throws IOException {
    try {
        /**
         * 执行业务代码...
         * */
        int i = 1 / 0; //故意报错测试
    } catch (Exception e) {
        log.error("签收失败", e);
        /**
         * 记录日志、发送邮件、保存消息到数据库,落库之前判断如果消息已经落库就不保存
         * */
        throw new RuntimeException("消息消费失败");
    }
}

注意:一定要手动throw一个异常,因为springBoot触发重试是根据方法中发生未捕捉的异常来决定的。值得注意的是这个重试是SpringBoot提供的,重新执行消费者方法,而不是让RabbitMQ重新推送消息。

1.3.2 消费者失败消息处理策略

在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecovery接口来处理,它包含三种不同的实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

如何确保RabbitMQ消息的可靠性?

  • 开启生产者确认机制,确保生产者的消息能到达队列
  • 开启持久化功能,确保消息未消费前在队列中不会丢失
  • 开启消费者确认机制为auto,由spring确认消息处理成功后完成ack
  • 开启消费者失败重试机制,并设置MessageRecoverer,多次重试失败后将消息投递到异常交换机,交由人工处理

1.3.3 业务幂等性

业务幂等性
幂等是一个数学概念,用函数表达式来描述是这样的: f(x)= f(f(x))。在程序开发中,则是指同一个业务,执行一次或多次对业务状态的影响是一致的
幂等

  • 查询业务, 例如根据id查询商品
  • 删除业务,例如根据id删除商品
    非幂等
  • 用户下单业务,需要扣减库存
  • 用户退款业务,需要恢复余额
方案一:唯一消息id

是给每个消息都设置一个唯一id利用id区分是否是重复消息
①每一条消息都生成一个唯一id, 与消息一起投递给消费者。
②消费者接收到消息后处理自己的业务,业务处理成功后将消息ID保存到数据库
③如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。

@Bean
public MessageConverter messageConverter(){
	// 1.定义消息转换器
	Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter() ;
	// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
	jjmc.setCreateMessageIds(true) ;
	return jjmc;
}

在这里插入图片描述
获取消息ID

@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(Message message) {
	log.info("监听到simple.queue的消息: ID: [{}] ", message. getMessageProperties().getMessageId() ;
	log.info("监听到simple . queue的消息: [{}] ",new String(message.getBody());
	throw new RuntimeException("我是故意的! ");
}
	
	
方案二:业务判断

方案二,是结合业务逻辑,基于业务本身做判断。以我们做的余额支付业务为例:
在这里插入图片描述

UPDATE orders SET status = 'PAID' 
WHERE id = #{orderId} AND status = 'UNPAID'

二、延迟消息:怎么实现“15分钟后取消订单”?

2.1 延迟消息

  • 延迟消息:发送者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定时间之后才收到消息。
  • 延迟任务:设置在一定时间之后才执行的任务
    在这里插入图片描述

比如用户下单后,30 分钟未支付就自动取消,这就是延迟消息。RabbitMQ 有两种实现方式:

2.2 死信交换机:利用 “过期消息” 实现延迟

死信(dead letter):满足以下条件的消息会变成 “死信”,可以被转发到 “死信交换机”:

  • 消息过期没人要(比如设置 15 分钟过期)。
  • 消费者拒绝接收(使用basic.reject或 basic.nack声明消费失败)且不重新入队(requeue参数设置为false)。
  • 队列满了,最早的消息被挤出去。
    如果该队列配置了dead-letter-exchange属性,指定了个一交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机称为死信交换机(Dead Letter Exchange,简称DLX)。
    给队列设置dead-letter-routing-key属性,设置死信交换机与死信队列的RoutingKey
    在这里插入图片描述

TTL
TTL,也就是Time-To-Live。 如果一个队列中的消息TTL结束仍未消费,则会变为死信,tt超时分为两种情况:

  • 消息所在的队列设置了存活时间
  • 消息本身设置了存活时间
    在这里插入图片描述
    tip:若队列和消息都设置了存活时间,则存活时间取两者较小值

实现订单取消案例:

  1. 声明 “普通队列”(接收下单消息),设置 15分钟过期,绑定到 “死信交换机”。
  2. 声明 “死信队列”(接收过期的死信),消费者监听死信队列,处理取消订单。
// 配置类
@Configuration
public class DelayConfig {
    // 死信交换机
    @Bean
    public DirectExchange deadLetterExchange() {
        return new DirectExchange("dead.letter.exchange");
    }
    // 死信队列(处理取消订单)
    @Bean
    public Queue deadLetterQueue() {
        return QueueBuilder.durable("dead.letter.queue").build();
    }
    // 绑定死信交换机和队列
    @Bean
    public Binding deadLetterBinding() {
        return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with("dead.key");
    }

    // 普通队列(接收下单消息,15分钟过期)
    @Bean
    public Queue orderQueue() {
        return QueueBuilder.durable("order.queue")
                .ttl(15 * 60 * 1000) // 15分钟过期
                .deadLetterExchange("dead.letter.exchange") // 过期后转发到死信交换机
                .deadLetterRoutingKey("dead.key") // 死信路由键
                .build();
    }
}

生产者发送下单消息到order.queue:

@Test
public void sendOrder() {
    String orderId = "123456";
    rabbitTemplate.convertAndSend("", "order.queue", "下单消息:" + orderId);
}

消费者监听死信队列,处理取消订单:

@RabbitListener(queues = "dead.letter.queue")
public void handleCancelOrder(String msg) {
    System.out.println("取消订单:" + msg); // 执行取消逻辑
}

延迟队列
利用TTL结合死信交换机,我们实现了消息发出后,消费者延迟收到消息的效果。这种消息模式就称为延迟队列(Delay Queue)模式
延迟队列的使用场景包括:

  • 延迟发送短信
  • 用户下单,如果用户在15分钟内未支付,则自动取消
  • 预约工作会议,20分钟后自动通知所有参会人员

2.3 延迟队列插件

RabbitMQ 官方提供了rabbitmq_delayed_message_exchange插件,直接支持延迟消息,不用死信交换机。
在这里插入图片描述

这个插件可以将普通交换机改造为支持延迟消息功能的交换机,当消息投递到交换机后可以暂存一定时间,到期后在投递到队列。

步骤:

  1. 安装插件(Docker 安装参考官方文档)。
# 安装插件
docker exec -it rabbitmq rabbitmq-plugins enable rabbitmq_delayed_message_exchange
  1. 声明 “延迟交换机”(类型x-delayed-message)。
  2. 发送消息时指定延迟时间。
// 延迟交换机
@Bean
public CustomExchange delayExchange() {
    Map<String, Object> args = new HashMap<>();
    args.put("x-delayed-type", "direct"); // 延迟交换机类型
    return new CustomExchange("delay.exchange", "x-delayed-message", true, false, args);
}

// 延迟队列(处理取消订单)
@Bean
public Queue delayQueue() {
    return QueueBuilder.durable("delay.queue").build();
}

// 绑定
@Bean
public Binding delayBinding() {
    return BindingBuilder.bind(delayQueue()).to(delayExchange()).with("delay.key").noargs();
}

// 发送延迟消息(15分钟后投递)
@Test
public void sendDelay() {
    rabbitTemplate.convertAndSend("delay.exchange", "delay.key", "下单消息", message -> {
        message.getMessageProperties().setDelay(15 * 60 * 1000); // 延迟15分钟
        return message;
    });
}

三、惰性队列:消息堆积了怎么办?

消息堆积问题
生产者发送消息的速度超过了消费者处理消息的速度,就会导致队列中的消息堆积,直到队列存储消息达到上限。最早接收到的消息,可能就会成为死信,会被丢弃,这就是消息堆积问题。
普通队列像 “桌上的文件”,堆多了会乱;惰性队列像 “抽屉里的文件”,堆再多也稳定。

惰性队列特点:

  • 消息直接存磁盘(普通队列先存内存,满了才存磁盘)。
  • 消费者取消息时才从磁盘读入内存。
  • 支持百万级消息堆积,适合高并发场景。
    在这里插入图片描述

怎么用?
声明队列时指定x-queue-mode: lazy:

@Bean
public Queue lazyQueue() {
    return QueueBuilder.durable("lazy.queue")
            .withArgument("x-queue-mode", "lazy") // 惰性队列
            .build();
}

注意:RabbitMQ 3.12 + 版本后,所有队列默认是惰性队列,不用额外配置。

解决消息堆积有三种思路

  1. 增加消费者
@RabbitListener(queues = "busy.queue", concurrency = "5")
public void handleMessage(String msg) {
    // 多线程处理
}
  1. 批量消费
@RabbitListener(queues = "batch.queue")
public void handleBatch(List<Message> messages) {
    // 批量处理逻辑
}
  1. 提升消费速度
    • 优化业务逻辑
    • 异步处理
    • 缓存预热

惰性队列的优点有哪些?

  • 基于磁盘存储,消息上限高
  • 没有间歇性的page-out,性能比较稳定

惰性队列的缺点有哪些?

  • 基于磁盘存储,消息时效性会降低
  • 性能受限于磁盘的IO

四、高可用,MQ集群:怎么避免MQ单点故障

如果只有一个 MQ 节点,宕机后整个系统就没法收发消息了。需要集群保证高可用,RabbitMQ 有 3 种集群模式:

集群模式 特点 适合场景
普通集群 多节点分工,交换机 / 队列元信息共享,消息存在各自节点。某节点宕机,其消息丢失。 并发高,消息不重要
镜像集群 主从备份,消息同步到所有节点。主节点宕机,从节点顶上。 消息重要,需高可用
仲裁队列 基于 Raft 协议,主从强一致,配置简单(3.8 + 支持)。 消息极重要,追求可靠性

RabbitMQ的是基于Erlang语言编写,而Erlang又是一个面向并发的语言,天然支持集群模式。RabbitMQ的集群有两种模式:

  • 普通集群:是一种分布式集群,将队列分散到集群的各个节点,从而提高整个集群的并发能力。
  • 镜像集群:是一种主从集群,普通集群的基础上,添加了主从备份功能,提高集群的数据可用性。
    镜像集群虽然支持主从,但主从同步并不是强一致的,某些情况下可能有数据丢失的风险。因此在RabbitMQ的3.8版本以后,推出了新的功能:仲裁队列来代替镜像集群,底层采用Raft协议确保主从的数据一致性。

4.1 普通集群

普通集群,或者叫标准集群(classic cluster) ,具备下列特征:

  • 会在集群的各个节点间共享部分数据,包括:交换机、队列元信息。不包含队列中的消息。
  • 当访问集群某节点时,如果队列不在该节点,会从数据所在节点传递到当前节点并返回
  • 队列所在节点宕机,队列中的消息就会丢失
    在这里插入图片描述

4.2 镜像集群

镜像集群:本质是主从模式,具备下面的特征:

  • 交换机、队列、队列中的消息会在各个mq的镜像节点之间同步备份。
  • 创建队列的节点被称为该队列的主节点,备份到的其它节点叫做该队列的镜像节点。
  • 一个队列的主节点可能是另一个队列的镜像节点
  • 所有操作都是主节点完成,然后同步给镜像节点
  • 主宕机后,镜像节点会替代成新的主
    在这里插入图片描述

4.3 仲裁队列

仲裁队列:仲裁队列是3.8版本以后才有的新功能,用来替代镜像队列,具备下列特征:

  • 与镜像队列一样,都是主从模式,支持主从数据同步
  • 使用非常简单,没有复杂的配置
  • 主从同步基于Raft协议,强一致
    在这里插入图片描述

推荐用仲裁队列(简单可靠)
声明队列时指定quorum类型:

@Bean
public Queue quorumQueue() {
    return QueueBuilder.durable("quorum.queue") //持久化
            .quorum() // 仲裁队列
            .build();
}

SpringAMQP连接集群,只需要在yaml中配置即可:

spring:
	rabbitmq :
		addresses: 192. 168.150.105:8071,192. 168.150.105:8072,192.168. 150.105: 8073
		username: i tcast
		password: 123321
		virtual-host: /

总结:RabbitMQ 高级特性核心

  • 可靠投递:生产者确认 + 持久化 + 消费者确认 + 重试兜底,确保消息不丢。
  • 延迟消息:用死信交换机或延迟插件,实现订单取消等场景。
  • 惰性队列:解决消息堆积,适合高并发。
  • 集群 / 仲裁队列:保证 MQ 高可用,避免单点故障。
  • JSON 序列化:消息清晰,体积小。

五、常见问题解答

Q:如何选择集群模式?

  • 普通集群:高吞吐需求,可容忍数据丢失
  • 镜像集群:需要自动故障转移
  • 仲裁队列:强一致性要求(推荐)

Q:消息堆积时怎么办?

  • 优先使用惰性队列
  • 增加消费者实例
  • 优化消费者性能
  • 设置消息TTL自动过期

Q:如何保证顺序消费?

  • 单队列单消费者
  • 消息分组(RabbitMQ 3.8+)
  • 业务层排序标识

通过以上高级特性的合理运用,RabbitMQ可以支撑百万级消息处理,实现99.99%的可靠性保障!

Logo

开源鸿蒙跨平台开发社区汇聚开发者与厂商,共建“一次开发,多端部署”的开源生态,致力于降低跨端开发门槛,推动万物智联创新。

更多推荐