本文介绍:消息可靠性保障需从发送者、MQ、消费者三方面着手:发送者通过重连机制和确认机制(PublisherConfirm/Return)确保消息投递;MQ通过数据持久化和LazyQueue优化存储;消费者采用确认机制(ack/nack/reject)、失败重试及幂等处理(唯一约束/业务判断)防止重复消费。兜底方案使用延迟消息(死信队列/DelayExchange插件)处理异常情况,形成完整的可靠性保障体系。

如何保证消息可靠性

保证消息可靠性要从发送者、MQ、消费者这三个方面考虑。

发送者的可靠性

发送者重连机制

有时候可能会因为网络的问题,导致发送者没能连接上MQ,这时就需要对发送者进行发送者重连,可以通过开启Spring AMQP提供的失败后重连的配置,因为它默认是false关闭的。

spring:
  rabbitmq:
    connection-timeout:	1s	#设置MO的连接超时时间
    template:
      retry:
        enabled:	true #开启超时重试机制
        initial-interval:	200ms	#失败后的初始等待时间
        multiplier:	1 	# 失败后下次的等待时长倍数
        max-attempts:	3	#最大重试次数

不过呢,这个重试机制是阻塞的,它在重试等待的过程中当前线程会阻塞,从而影响其它业务进行的性能。因此,如果追求高性能,可以不开启重试机制;如果一定要保证发送者消息的可靠性,可以开启重试,但是一定要合理设置重连等待时长和次数,等待时间不要太长了。当然了,也可以采用异步线程的方式,我可以将发送消息的代码放在单独线程进行,这样在触发重试时就不会影响阻塞主线程执行的任务了。

发送者确认机制

Spring AMOP提供了Publisher ConfirmPublisher Return两种确认机制。开启确认机制后,当发送者发送消息给MQ后,MQ就会返回确认结果给发送者。

返回的结果有以下3种情况:

1、消息到达了MQ,但是通过交换机进行路由失败时(可能是没绑定到队列或绑定路径错误导致路由失败),此时会通过Publisher Return返回路由异常原因,然后通过Publisher Confirm返回ACK,告知投递成功

2、临时消息到达了MQ,并且入队成功,返回ACK,告知投递成功;否则返回NACK,告知投递失败

3、持久消息到达了MQ,入队成功,将消息写入磁盘完成持久化,返回ACK,告知投递成功;否则返回NACK,告知投递失败

ACK (Acknowledge character)是确认字符,在数据通信中,接收站发给发送站的一种传输类控制字符。 表示发来的数据已确认接收无误。 在TCP/IP协议中,如果接收方成功的接收到数据,那么会回复一个ACK数据。 通常ACK信号有自己固定的格式,长度大小,由接收方回复给发送方。

如何开启确认机制来实现发送者确认?

spring:
  rabbitmq:
    publisher-confirm-type:	correlated	# 开启publisher confirm机制,并设置confirm类型,默认是none关闭的
    publisher-returns:	true	#开publisher return机制

publisher-confirm-type有三种模式:

1、none:关闭confirm机制(默认)

2、simple:同步阻塞等待MQ的回执消息

3、correlated:MQ异步回调方式返回回执消息

ReturnCallback:每个RabbitTemplate只能配置一个ReturnCallback,因此在项目启动的时候配置就可以了,而不是在每发一次消息进行配置

@Slf4j
@AllArgsConstructor
@Configuration
public class MqConfig{
    private final RabbitTemplate rabbitTemplate;
    @PostConstruct
    public void init(){
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.Returnscallback(){
            @Override
            public void returnedMessage(ReturnedMessage returned){
                log.error("触发return callback"):
            }
        });
    }
}

ConfirmCallback:发送消息,在每次发消息时都要进行一次配置,指定消息ID(识别哪个消息的关键标识)、消息ConfirmCallback

@Slf4j
@SpringBootTest
public class RabbitMQPublisherConfirmTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    void testPublisherConfirm() throws InterruptedException {
        // 1.创建CorrelationData
        CorrelationData cd = new CorrelationData();
        
        // 2.给Future添加ConfirmCallback
        cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
            @Override
            public void onFailure(Throwable ex) {
                // 2.1.Future发生异常时的处理逻辑,基本不会触发
                log.error("Handle message ack fail", ex);
            }
            
            @Override
            public void onSuccess(CorrelationData.Confirm result) {
                // 2.2.Future接收到回执的处理逻辑,参数中的result就是回执内容
                if (result.isAck()) { // result.isAck(), boolean类型, true代表ack回执, false 代表 nack回执
                    log.debug("发送消息成功,收到 ack!");
                } else { // result.getReason(), String类型,返回nack时的异常描述
                    log.error("发送消息失败,收到 nack, reason : {}", result.getReason());
                }
            }
        });
        
        // 3.发送消息
        rabbitTemplate.convertAndSend("hmall.direct", "redl", "hello", cd);
    }
}

MQ的可靠性

默认情况下,RabbitMQ会将接收到的信息保存在内存中。有时候可能会出现下面两个问题

1、MQ宕机,内存中的消息会丢失

2、内存空间有限,当消费者故障或处理过慢时,会导致消息积压,MQ阻塞

如果还用重发的方法来解决,那MQ的性能会变差,因此要保证MQ的可靠性要从存储上解决。

数据持久化

对RabbitMQ提前进行数据持久化到磁盘,从以下3个方面进行持久化:

1、交换机持久化

2、队列持久化

3、消息持久化

Spring AMQP创建交换机和发消息的持久化默认是支持的,因此不需要额外做什么,只要对队列持久化额外处理。

进行了持久化,发的MQ消息在传统的队列是会在内存中保存一份,然后再在磁盘写一份,那这样每一条处理消息处理的耗时是不是就变长了,导致整体的一个并发能力是有点下降的。那要让接收到的消息直接写入磁盘,不需要存储到内存中,就需要用到LazyQueue。

LazyQueue(懒惰队列)

RabbitMQ从3.6.0版本开始,增加了LazyQueue的概念。在3.12版本后,所有队列都是Lazy Queue模式,无法更改。

LazyQueue是什么呢?它是 RabbitMQ 中的一种队列模式,它通过尽可能将消息存储在磁盘而不是内存中,来优化消息的持久化处理,从而解决内存压力和大量消息堆积的问题。

LazyQueue的两个特性:

1、接收到消息后直接存入磁盘,不再存储到内存

2、消费者要消费消息时才会从磁盘中读取并加载到内存(可以提前缓存部分消息到内存,最多2048条)

声明LazyQueue的三种方式

方式1:通过Bean声明:

@Bean
public Queue lazyQueue(){
return QueueBuilder
        .durable("lazy.queue")
        .lazy()// 开启Lazy模式
        .build();
}

方式2:通过注解形式声明:

@RabbitListener(queuesToDeclare = @Queue(
    name = "lazy.queue",
    durable = "true",
    arguments = @Argument(name ="x-queue-mode",value = "lazy" )
))
public void listenLazyQueue(string msg){
    log.info("接收到 lazy.queue的消息:{}",msg);
}

方式3:通过控制台的方式声明:

消费者的可靠性

确认机制

消费者确认机制是为了确认消费者是否成功处理消息。当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态:

  • ack:成功处理消息,RabbitMQ从队列中删除该消息
  • nack:消息处理失败,RabbitMQ要再次向队列发送消息
  • reject:消息处理失败并拒绝接收该消息,RabbitMQ从队列中删除该消息

Spring AMQP有三种消息确认方式:

1、不处理(none):即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用

2、手动模式(manual):需要自己在业务代码中调用api,发送ack或reject,存在业务入侵,但更灵活

3、自动模式(auto):Spring AMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack;当业务出现异常时,根据异常判断返回不同结果:

  • 如果是业务异常,会自动返回nack
  • 如果是消息处理或校验异常,自动返回reject

配置方式:

spring:
  rabbitmq:
    listener:
    simple:
      prefetch:	1
      acknowledge-mode:	none

失败重试机制

Spring AMOP提供了消费者失败重试机制,在消费者出现异常时利用本地重试,而不是无限的重新投递消息,一直重新入队。通过在application.yaml文件中给消费者添加配置来开启重试机制:

spring:
  rabbitmg:
    listener:
      simple:
        prefetch: 1
        retry:
          enabled: true 	#开启消费者失败重试
          initial-interval: 1000ms	#初始的失败等待时长为1秒
          multiplier: 1		#下次失败的等待时长倍数,下次等待时长=multiplier *last-interval
          max-attempts: 3		#最大重试次数
          stateless: true 	#true无状态:false有状态;如果业务中包含事务,这里改为false

但是,这种设置max-attempts最大重试次数耗尽时,失败重试机制默认会拒绝,并且将消息从队列中丢弃。

因此,要解决这个问题就要用到MessageRecoverer接口的另外两个实现类来解决:

1、ImmediateRequestMessageRecoverer在重试次数耗尽后,会立即返回nack,消息重新入队

2、RepublishMessageRecoverer在重试次数耗尽后,会将失败的消息投递到指定的交换机

配置失败重试处理策略:

@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
    return new RepublishMessageRecoverer(rabbitTemplate , "direct" , "queue");
}

幂等处理

幂等是什么?就是做一次和做多次的效果是一样的。如,支付1元和重复支付,账户只扣1元(多扣的要退款)。

那为什么需要幂等呢?
因为消息可能重复:

1、网络问题导致确认没收到,MQ重新发送

2、消费者处理超时,MQ认为失败重新发送

3、手动重试操作

如何实现幂等?

方法1:据库唯一约束(或全局id作为唯一标识)

-- 创建订单处理记录表
CREATE TABLE order_process (
  id BIGINT PRIMARY KEY,
  order_id VARCHAR(50) UNIQUE,  -- 唯一约束
  status VARCHAR(20),
  created_time TIMESTAMP
);

原理:1、第一次处理:插入成功,继续业务;2、第二次处理:插入失败(已存在),直接返回

方法2:基于业务逻辑判断来保证幂等性。

虽然对发送者、MQ、消费者都进行了消息的可靠性保证,但是没有绝对100%能够保证不会出现消息丢失,哪怕是99.9%,也还有0.1%的可能性发生丢失,因此我们还需要做一个兜底方案。

兜底方案:延迟消息

延迟消息是什么?发送者发送消息时指定一个时间,消费者不会立即收到消息,而是在指定时间到了才收到消息。

那怎么实现延迟消息呢?可以通过死信队列或使用延迟消息的插件来实现延迟消息方案。

死信队列

死信是什么?无法被正常投递的消息

消息成为死信的三种情况:

1、消息被拒绝:消费者说"我不要这个"

2、消息过期:超过生存时间(TTL)

3、队列满了:装不下了

死信队列的作用

1、收集问题消息:统一处理异常情况

2、实现延迟消息:通过"消息过期"机制

3、问题排查:分析为什么消息成为死信

工作原理:

1. 创建一个"延迟队列",设置消息30分钟后过期

2. 消息进入延迟队列,等待30分钟

3. 30分钟后,消息过期,自动转到"死信队列"

4. 消费者从死信队列获取消息并处理

一个普通的交换机和一个绑定了死信交换机的普通队列,然后给死信交换机绑定死信队列,绑定消费者

延迟消息插件

DelayExchange插件:RabbitMQ 提供的延迟消息插件,可以简化延迟消息的处理过程。该插件通过设计一种特殊的交换机,当消息投递到这种交换机时,它能够暂存一段时间,直到达到设定的延迟时间后再将消息投递到相应的队列。

DelayExchange在消息发布时检查是否设置了x-delay头,如果有,则会将消息存储在内部数据库中,直到延迟时间到达后才转发到目标队列。

Docker安装方式:

1、下载插件的地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

2、上传插件到mq:

docker cp rabbitmq_delayed_message_exchange-3.8.0.ez <container_id>:/plugins

3、安装启用插件:

docker exec -it <container_id> rabbitmq-plugins enable rabbitmq_delayed_message_exchange

4、重启容器

docker restart <container_id>

然后通过注解的方式声明队列、交换机、绑定队列交换机并设置交换机为延迟交换机来实现延迟消息:

@RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "delay.queue", durable = "true"),
            exchange = @Exchange(name = "delay.direct", delayed = "true"),
            key = "delay"
    ))
    public void listenDelayQueue(String msg) {
        log.info("接收到延迟消息:{}" , msg);
    }

基于Bean的方式声明队列、交换机、绑定队列交换机并设置交换机为延迟交换机来实现延迟消息:

@Configuration
public class DirectConfiguration {
    //设置延迟交换机
    @Bean
    public DirectExchange delayExchange() {
        return ExchangeBuilder
                .directExchange("delay.direct")
                .delayed()
                .durable(true)
                .build();
    }
    //声明队列
    @Bean
    public Queue delayedQueue() {
        return new Queue("delay.queue");
    }
    //绑定队列与交换机
    @Bean
    public Binding delayQueueBinding() {
        return BindingBuilder.bind(delayedQueue()).to(delayExchange()).with("delay");
    }
}

然后,发送消息时需要通过消息头x-delay设置过期时间:

@Test
void testPublisherDelayMessage(){
    // 1.创建消息
    String message ="延迟消息";
    //2.发送消息,利用消息后置处理器MessagePostProcessor添加消息头
    rabbitTemplate.convertAndSend("delay,direct","delay", message, new MessagePostProcessor(){
        @Override
        public Message postProcessMessage(Message message)throws AmqpException {
            // 添加延迟消息属性
            message.getMessageProperties().setDelay(5000);
            return message;
        }
    });
}
Logo

开源鸿蒙跨平台开发社区汇聚开发者与厂商,共建“一次开发,多端部署”的开源生态,致力于降低跨端开发门槛,推动万物智联创新。

更多推荐