如何保证消息可靠性问题(RabbitMQ消息队列)?
本文介绍:消息可靠性保障需从发送者、MQ、消费者三方面着手:发送者通过重连机制和确认机制(PublisherConfirm/Return)确保消息投递;MQ通过数据持久化和LazyQueue优化存储;消费者采用确认机制(ack/nack/reject)、失败重试及幂等处理(唯一约束/业务判断)防止重复消费。兜底方案使用延迟消息(死信队列/DelayExchange插件)处理异常情况,形成完整的可靠性保障体系。
如何保证消息可靠性
保证消息可靠性要从发送者、MQ、消费者这三个方面考虑。
发送者的可靠性
发送者重连机制
有时候可能会因为网络的问题,导致发送者没能连接上MQ,这时就需要对发送者进行发送者重连,可以通过开启Spring AMQP提供的失败后重连的配置,因为它默认是false关闭的。
spring:
rabbitmq:
connection-timeout: 1s #设置MO的连接超时时间
template:
retry:
enabled: true #开启超时重试机制
initial-interval: 200ms #失败后的初始等待时间
multiplier: 1 # 失败后下次的等待时长倍数
max-attempts: 3 #最大重试次数
不过呢,这个重试机制是阻塞的,它在重试等待的过程中当前线程会阻塞,从而影响其它业务进行的性能。因此,如果追求高性能,可以不开启重试机制;如果一定要保证发送者消息的可靠性,可以开启重试,但是一定要合理设置重连等待时长和次数,等待时间不要太长了。当然了,也可以采用异步线程的方式,我可以将发送消息的代码放在单独线程进行,这样在触发重试时就不会影响阻塞主线程执行的任务了。
发送者确认机制
Spring AMOP提供了Publisher Confirm和Publisher Return两种确认机制。开启确认机制后,当发送者发送消息给MQ后,MQ就会返回确认结果给发送者。
返回的结果有以下3种情况:
1、消息到达了MQ,但是通过交换机进行路由失败时(可能是没绑定到队列或绑定路径错误导致路由失败),此时会通过Publisher Return返回路由异常原因,然后通过Publisher Confirm返回ACK,告知投递成功
2、临时消息到达了MQ,并且入队成功,返回ACK,告知投递成功;否则返回NACK,告知投递失败
3、持久消息到达了MQ,入队成功,将消息写入磁盘完成持久化,返回ACK,告知投递成功;否则返回NACK,告知投递失败

ACK (Acknowledge character)是确认字符,在数据通信中,接收站发给发送站的一种传输类控制字符。 表示发来的数据已确认接收无误。 在TCP/IP协议中,如果接收方成功的接收到数据,那么会回复一个ACK数据。 通常ACK信号有自己固定的格式,长度大小,由接收方回复给发送方。
如何开启确认机制来实现发送者确认?
spring:
rabbitmq:
publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型,默认是none关闭的
publisher-returns: true #开publisher return机制
publisher-confirm-type有三种模式:
1、none:关闭confirm机制(默认)
2、simple:同步阻塞等待MQ的回执消息
3、correlated:MQ异步回调方式返回回执消息
ReturnCallback:每个RabbitTemplate只能配置一个ReturnCallback,因此在项目启动的时候配置就可以了,而不是在每发一次消息进行配置
@Slf4j
@AllArgsConstructor
@Configuration
public class MqConfig{
private final RabbitTemplate rabbitTemplate;
@PostConstruct
public void init(){
rabbitTemplate.setReturnsCallback(new RabbitTemplate.Returnscallback(){
@Override
public void returnedMessage(ReturnedMessage returned){
log.error("触发return callback"):
}
});
}
}
ConfirmCallback:发送消息,在每次发消息时都要进行一次配置,指定消息ID(识别哪个消息的关键标识)、消息ConfirmCallback
@Slf4j
@SpringBootTest
public class RabbitMQPublisherConfirmTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void testPublisherConfirm() throws InterruptedException {
// 1.创建CorrelationData
CorrelationData cd = new CorrelationData();
// 2.给Future添加ConfirmCallback
cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
@Override
public void onFailure(Throwable ex) {
// 2.1.Future发生异常时的处理逻辑,基本不会触发
log.error("Handle message ack fail", ex);
}
@Override
public void onSuccess(CorrelationData.Confirm result) {
// 2.2.Future接收到回执的处理逻辑,参数中的result就是回执内容
if (result.isAck()) { // result.isAck(), boolean类型, true代表ack回执, false 代表 nack回执
log.debug("发送消息成功,收到 ack!");
} else { // result.getReason(), String类型,返回nack时的异常描述
log.error("发送消息失败,收到 nack, reason : {}", result.getReason());
}
}
});
// 3.发送消息
rabbitTemplate.convertAndSend("hmall.direct", "redl", "hello", cd);
}
}
MQ的可靠性
默认情况下,RabbitMQ会将接收到的信息保存在内存中。有时候可能会出现下面两个问题
1、MQ宕机,内存中的消息会丢失
2、内存空间有限,当消费者故障或处理过慢时,会导致消息积压,MQ阻塞
如果还用重发的方法来解决,那MQ的性能会变差,因此要保证MQ的可靠性要从存储上解决。
数据持久化
对RabbitMQ提前进行数据持久化到磁盘,从以下3个方面进行持久化:
1、交换机持久化
2、队列持久化
3、消息持久化
Spring AMQP创建交换机和发消息的持久化默认是支持的,因此不需要额外做什么,只要对队列持久化额外处理。
进行了持久化,发的MQ消息在传统的队列是会在内存中保存一份,然后再在磁盘写一份,那这样每一条处理消息处理的耗时是不是就变长了,导致整体的一个并发能力是有点下降的。那要让接收到的消息直接写入磁盘,不需要存储到内存中,就需要用到LazyQueue。
LazyQueue(懒惰队列)
RabbitMQ从3.6.0版本开始,增加了LazyQueue的概念。在3.12版本后,所有队列都是Lazy Queue模式,无法更改。
LazyQueue是什么呢?它是 RabbitMQ 中的一种队列模式,它通过尽可能将消息存储在磁盘而不是内存中,来优化消息的持久化处理,从而解决内存压力和大量消息堆积的问题。
LazyQueue的两个特性:
1、接收到消息后直接存入磁盘,不再存储到内存
2、消费者要消费消息时才会从磁盘中读取并加载到内存(可以提前缓存部分消息到内存,最多2048条)
声明LazyQueue的三种方式
方式1:通过Bean声明:
@Bean
public Queue lazyQueue(){
return QueueBuilder
.durable("lazy.queue")
.lazy()// 开启Lazy模式
.build();
}
方式2:通过注解形式声明:
@RabbitListener(queuesToDeclare = @Queue(
name = "lazy.queue",
durable = "true",
arguments = @Argument(name ="x-queue-mode",value = "lazy" )
))
public void listenLazyQueue(string msg){
log.info("接收到 lazy.queue的消息:{}",msg);
}
方式3:通过控制台的方式声明:

消费者的可靠性
确认机制
消费者确认机制是为了确认消费者是否成功处理消息。当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态:
- ack:成功处理消息,RabbitMQ从队列中删除该消息
- nack:消息处理失败,RabbitMQ要再次向队列发送消息
- reject:消息处理失败并拒绝接收该消息,RabbitMQ从队列中删除该消息
Spring AMQP有三种消息确认方式:
1、不处理(none):即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用
2、手动模式(manual):需要自己在业务代码中调用api,发送ack或reject,存在业务入侵,但更灵活
3、自动模式(auto):Spring AMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack;当业务出现异常时,根据异常判断返回不同结果:
- 如果是业务异常,会自动返回nack
- 如果是消息处理或校验异常,自动返回reject
配置方式:
spring:
rabbitmq:
listener:
simple:
prefetch: 1
acknowledge-mode: none
失败重试机制
Spring AMOP提供了消费者失败重试机制,在消费者出现异常时利用本地重试,而不是无限的重新投递消息,一直重新入队。通过在application.yaml文件中给消费者添加配置来开启重试机制:
spring:
rabbitmg:
listener:
simple:
prefetch: 1
retry:
enabled: true #开启消费者失败重试
initial-interval: 1000ms #初始的失败等待时长为1秒
multiplier: 1 #下次失败的等待时长倍数,下次等待时长=multiplier *last-interval
max-attempts: 3 #最大重试次数
stateless: true #true无状态:false有状态;如果业务中包含事务,这里改为false
但是,这种设置max-attempts最大重试次数耗尽时,失败重试机制默认会拒绝,并且将消息从队列中丢弃。
因此,要解决这个问题就要用到MessageRecoverer接口的另外两个实现类来解决:
1、ImmediateRequestMessageRecoverer在重试次数耗尽后,会立即返回nack,消息重新入队
2、RepublishMessageRecoverer在重试次数耗尽后,会将失败的消息投递到指定的交换机
配置失败重试处理策略:
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate , "direct" , "queue");
}
幂等处理
幂等是什么?就是做一次和做多次的效果是一样的。如,支付1元和重复支付,账户只扣1元(多扣的要退款)。
那为什么需要幂等呢?
因为消息可能重复:
1、网络问题导致确认没收到,MQ重新发送
2、消费者处理超时,MQ认为失败重新发送
3、手动重试操作
如何实现幂等?
方法1:据库唯一约束(或全局id作为唯一标识)
-- 创建订单处理记录表
CREATE TABLE order_process (
id BIGINT PRIMARY KEY,
order_id VARCHAR(50) UNIQUE, -- 唯一约束
status VARCHAR(20),
created_time TIMESTAMP
);
原理:1、第一次处理:插入成功,继续业务;2、第二次处理:插入失败(已存在),直接返回
方法2:基于业务逻辑判断来保证幂等性。
虽然对发送者、MQ、消费者都进行了消息的可靠性保证,但是没有绝对100%能够保证不会出现消息丢失,哪怕是99.9%,也还有0.1%的可能性发生丢失,因此我们还需要做一个兜底方案。
兜底方案:延迟消息
延迟消息是什么?发送者发送消息时指定一个时间,消费者不会立即收到消息,而是在指定时间到了才收到消息。
那怎么实现延迟消息呢?可以通过死信队列或使用延迟消息的插件来实现延迟消息方案。
死信队列
死信是什么?无法被正常投递的消息
消息成为死信的三种情况:
1、消息被拒绝:消费者说"我不要这个"
2、消息过期:超过生存时间(TTL)
3、队列满了:装不下了
死信队列的作用:
1、收集问题消息:统一处理异常情况
2、实现延迟消息:通过"消息过期"机制
3、问题排查:分析为什么消息成为死信
工作原理:
1. 创建一个"延迟队列",设置消息30分钟后过期
2. 消息进入延迟队列,等待30分钟
3. 30分钟后,消息过期,自动转到"死信队列"
4. 消费者从死信队列获取消息并处理
一个普通的交换机和一个绑定了死信交换机的普通队列,然后给死信交换机绑定死信队列,绑定消费者

延迟消息插件
DelayExchange插件:RabbitMQ 提供的延迟消息插件,可以简化延迟消息的处理过程。该插件通过设计一种特殊的交换机,当消息投递到这种交换机时,它能够暂存一段时间,直到达到设定的延迟时间后再将消息投递到相应的队列。
DelayExchange在消息发布时检查是否设置了x-delay头,如果有,则会将消息存储在内部数据库中,直到延迟时间到达后才转发到目标队列。
Docker安装方式:
1、下载插件的地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
2、上传插件到mq:
docker cp rabbitmq_delayed_message_exchange-3.8.0.ez <container_id>:/plugins
3、安装启用插件:
docker exec -it <container_id> rabbitmq-plugins enable rabbitmq_delayed_message_exchange
4、重启容器
docker restart <container_id>
然后通过注解的方式声明队列、交换机、绑定队列交换机并设置交换机为延迟交换机来实现延迟消息:
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "delay.queue", durable = "true"),
exchange = @Exchange(name = "delay.direct", delayed = "true"),
key = "delay"
))
public void listenDelayQueue(String msg) {
log.info("接收到延迟消息:{}" , msg);
}
基于Bean的方式声明队列、交换机、绑定队列交换机并设置交换机为延迟交换机来实现延迟消息:
@Configuration
public class DirectConfiguration {
//设置延迟交换机
@Bean
public DirectExchange delayExchange() {
return ExchangeBuilder
.directExchange("delay.direct")
.delayed()
.durable(true)
.build();
}
//声明队列
@Bean
public Queue delayedQueue() {
return new Queue("delay.queue");
}
//绑定队列与交换机
@Bean
public Binding delayQueueBinding() {
return BindingBuilder.bind(delayedQueue()).to(delayExchange()).with("delay");
}
}
然后,发送消息时需要通过消息头x-delay设置过期时间:
@Test
void testPublisherDelayMessage(){
// 1.创建消息
String message ="延迟消息";
//2.发送消息,利用消息后置处理器MessagePostProcessor添加消息头
rabbitTemplate.convertAndSend("delay,direct","delay", message, new MessagePostProcessor(){
@Override
public Message postProcessMessage(Message message)throws AmqpException {
// 添加延迟消息属性
message.getMessageProperties().setDelay(5000);
return message;
}
});
}更多推荐


所有评论(0)