RabbitMQ的工作流程

img

生产者和消费者通过建立连接和通道来与MQ进行消息的发送和接受。

生产者可以通过通道将消息发送到交换机,然后交换机负责进行路由操作(将信息分发到与该交换机绑定的队列中)。生产者也可以直接将消息发送到队列中。

消费者通过监听队列,一旦有消息发送到该队列,消费者便可以取出消息进行相应的处理。

访问mq的控制台

IP:15672可以访问控制台

image-20250116105557261

控制台访问不了如何排查

  • 查看有关控制台的插件是否安装

  • 查看15672端口是否开放

  • MQ服务是否启动

springboot整合rabbitmq

导入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
修改配置文件
spring:
  application:
    name: test
  rabbitmq:
    host: 192.168.149.142
    port: 5672
    username: xumingyu
    password: xu690421
    virtualHost: /
七种模型
Hello World!

image-20250116124654707

通过java代码来创建queue,需要注意的是Queue是org.springframework.amqp.core.Queue包下的。该方式是通过创建Bean的方式来生成一个Queue的。

通过@Bean注解让队列交给spring来管理,这样我们可以在springboot的其他地方来注入该队列来使用。

package com.example.mqtest.config;
​
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Configuration;
​
​
@Configuration
public class RabbitmqConfig {
    //定义消息队列的名字
    public static final String QUEUE_NAME = "my.queue";
​
    //通过bean来创建一个消息队列
    @Bean
    public Queue queue(){
        return new Queue(QUEUE_NAME,true);
    }
}

这是创建队列的方法,传入的参数分别是队列的名称,队列是否持久化。

new Queue(String name, boolean durable)

创建消费者

注意导入的是哪个包。 Channel导入的是com.rabbitmq.client.Channel

@RabbitListener用来声明该消费者监听哪一个队列

package com.example.mqtest;
​
import com.example.mqtest.config.RabbitmqConfig;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
​
@Component
public class Consummer {
    //监听my.queue队列
    @RabbitListener(queues = {RabbitmqConfig.QUEUE_NAME})
    public void receiveHelloQueueMessage(String msg, Message message, Channel channel) {
        System.out.println("消费者收到消息:"+msg);
    }
}
方法参数:
String msg:消息内容,自动完成反序列化
Message message:原始消息对象,包含消息属性
Channel channel:RabbitMQ 通道,用于手动确认等操作

创建生产者

rabbitTemplate.convertAndSend()使用该方法进行消息的发送,第一个参数用来指定发送的交换机,第二个参数是发送队列的名字,第三个参数是发送消息的内容。

@Resource
    private RabbitTemplate rabbitTemplate;
​
    @PostMapping("/sender/hello/{message}")
    public String senderHello(@PathVariable String message) {
        /**
         * 参数说明
         * exchnage: 交换机,默认交换机指定为“”即可
         * routingKey :发送消息的路由键,该模式下使用队列名即可
         * message:消息的内容
         */
        rabbitTemplate.convertAndSend("", RabbitmqConfig.QUEUE_NAME,message);
        return "success";
    }
WorkQueue(工作队列)

工作队列模型就是多个消费者同时监听同一个队列。在实际的情况中,消费者处理消息的速度往往是比生产者发消息的速度要慢的,因此如果消息不能够及时地得到处理,那么消息就会在队列中进行堆积,造成消息堆积的问题。因此我们可以让多个消费者同时去进行处理消息。

image-20250116134245644

多个消费者同时监听同一个队列,默认采用的是轮询分配的算法,比如两个消费者监听Queue如何该Queue收到了10条消息,那么会将这十条消息平均分配给两个消费者就是两个消费者每人5条消息。

image-20250116143353000

由于不同的消费者处理消息的速度可能不同,因此我们需要更加合理地分配消息,比如处理能力强的消费者应该多分到消息,处理能力弱的消费者应该少分配到消息。

可以通过设置预取数量来解决该问题。就相当于每个消费者只能同时占有几条消息。

将该设置设为1,就可以保证每个消费者只要处理完手头里的工作,才能获取新的消息,不会出现消费者占用太多的消息无法及时地处理。

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1  # 限制每个消费者最多未确认的消息数量为1
fanout-广播模型

在上面的模型中,一条消息只能够被一个消费者处理,如果我们希望一条消息能够被多个消费者处理,可以采用广播模型,这种模型是基于交换机收到消息后,将消息同时转发给与其绑定的所有队列中去。

image-20250116144550702

配置交换机与队列

首先我们需要创建好交换机对象和队列对象并交给spring来管理,该交换机选择fanout类型,并指定持久化。然后我们需要将交换机与队列绑定好绑定关系。fanout类型的交换机就像一个大喇叭一样将接受到的消息分发给所有的队列。

package com.example.mqtest.config;
​
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
​
@Configuration
public class RabbitmqConfigFanout {
    //定义消息队列的名字
    public static final String QUEUE_1 = "queue1";
    public static final String QUEUE_2 = "queue2";
    public static final String EXCHANGE_FANOUT = "exchnage-fanout";
​
​
    @Bean
    public Exchange exchange(){
        //定义一个fanout类型的交换机,并指定持久化
        return ExchangeBuilder.fanoutExchange(EXCHANGE_FANOUT).durable(true).build();
    }
​
    @Bean
    public Queue queue1() {
        //创建一个队列队列,并指定队列的名字和持久化
        return new Queue(QUEUE_1,true);
    }
​
    @Bean
    public Queue queue2() {
        //创建一个队列队列,并指定队列的名字
        return new Queue(QUEUE_2,true);
    }
​
    @Bean
    public Binding bindingQueue1() {
        //fanout模式不指定routingkey
        return BindingBuilder
                .bind(queue1()).to(exchange()).with("").noargs();
    }
    @Bean
    public Binding bindingQueue2() {
        return BindingBuilder
                .bind(queue2()).to(exchange()).with("").noargs();
    }
}

消费者和生产者正常创建就好了。

Routing(路由模型)

fanout类型的交换机是将消息分发给所有绑定的队列,有时候我们希望不同的队列能够接受不同类型的消息。此时我们可以使用direct类型的交换机。

image-20250116145720683

该模型创建direct类型的交换机,在绑定交换机与队列的关系的时候需要指定该队列的routingkey

@Configuration
public class RabbitmqConfigDirect {
    //定义消息队列的名字
    public static final String QUEUE_DIRECT_1 = "direct_queue1";
    public static final String QUEUE_DIRECT_2 = "direct_queue2";
    public static final String EXCHANGE_DIRECT = "exchnage-direct";
​
​
    @Bean
    public Exchange exchange(){
        //定义一个direct类型的交换机,并指定持久化
        return ExchangeBuilder.directExchange(EXCHANGE_DIRECT).durable(true).build();
    }
​
    @Bean
    public Queue queue1() {
        //创建一个队列队列,并指定队列的名字
        return new Queue(QUEUE_DIRECT_1,true);
    }
​
    @Bean
    public Queue queue2() {
        //创建一个队列队列,并指定队列的名字
        return new Queue(QUEUE_DIRECT_2,true);
    }
​
    @Bean
    public Binding bindingQueue1() {
        return BindingBuilder
                .bind(queue1()).to(exchange()).with("pay").noargs();
    }
    @Bean
    public Binding bindingQueue2() {
        return BindingBuilder
                .bind(queue2()).to(exchange()).with("order").noargs();
    }
}
​

在生产者发送消息到该交换机的时候还需要指定routingkey这样就能够保证不同的消息交给不同的消费者来处理了。

    @PostMapping("/sender/direct/{message}")
    public String senderDirect(@PathVariable String message) {
        /**
         * 参数说明
         * exchnage: 交换机,使用自定义的交换机
         * routingKey :发送消息的路由键,fanout模式指定为“”
         * message:消息的内容
         */
        rabbitTemplate.convertAndSend(RabbitmqConfigDirect.EXCHANGE_DIRECT, "pay",message);
        rabbitTemplate.convertAndSend(RabbitmqConfigDirect.EXCHANGE_DIRECT, "order",message);
        return "success";
    }
​
Topics(通配符)

Topics模型和Routing模型基本一样,知识Topics模型在设置routingkey的时候可以使用通配符。该模型在定义routingkey的时候一般是使用多个单词,多个单词之间使用.来进行分隔。

通配符规则:

  1. #匹配一个或多个单词

  2. *匹配一个单词

签收机制

默认情况下我们采用的是自动签收,即mq将消息发送给消费者以后,消息自动被签收,然后从mq中删除。这种方式如果消费者在处理消息的过程中出现了异常情况,那么就会出现消息丢失的情况,因为消息在mq中已经被删除了。因此我们要选择手动签收,就是让消费者完成消息的处理以后再手动签收,然后消息才会在mq中被删除。

我们需要通过以下配置来决定签收的模式

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual #默认是 auto 自动签署

消费者手动签收

消费者签收消息需要先拿到消息的tag,然后依据tag来进行签收。

一般情况下我们都会选择不做批量签收

@RabbitListener(queues = {RabbitmqConfigDirect.QUEUE_DIRECT_2})
public void receiveDirect2(String msg, Message message, Channel channel) throws IOException {
    System.out.println("receiveDirect2消费者2收到消息:"+msg);
    //拿到消息的tag
    long deliveryTag = message.getMessageProperties().getDeliveryTag();
    //签收消息:指定消息的tags ,以及不做批量签收
    channel.basicAck(deliveryTag,false);
}
持久化

mq的消息默认存储在内存中,如果mq宕机则可能会存在消息丢失的风险,因此我们可以采用持久化的方式来解决该问题

交换机持久化

在创建交换机的时候将durable属性设置为true

@Bean
public Exchange exchange(){
    //定义一个direct类型的交换机,并指定持久化
    return ExchangeBuilder.directExchange(EXCHANGE_DIRECT).durable(true).build();
}
​
队列持久化
@Bean
public Queue queue1() {
    //创建一个队列队列,并指定队列的名字
    return new Queue(QUEUE_DIRECT_1,true);
}
消息持久化

当我们使用springboot发送一条消息的时候,会自动帮我们实现消息的持久化。

两个回调函数的介绍

在RabbitTemplate中提供了2个接口

  • ConfirmCallback : 消息投递到Brocker后触发回调,可以用来检测消息是否到达RabbitMQ

  • ReturnsCallback : 消息发送失败回调,比如队列路由失败

    img

开启上面两种回调函数需要做如下配置:

spring:
  rabbitmq:
    publisher-returns: true #开启returnCallback回调
    template:
      mandatory: true #消息会返回给发送者的回调,而不是丢弃
    publisher-confirm-type: correlated #开启ConfirmCallback 回调
​

通过实现 RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback来实现两个回调函数的业务处理。

@Component
@Slf4j
public class RabbitMQCallback implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {
​
    @Autowired
    private RabbitTemplate rabbitTemplate;
​
​
    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
        //ReturnedMessage 消息对象中包括:交换机,路由key,消息内容等
        log.info(returnedMessage.getExchange()
                +","+returnedMessage.getRoutingKey()
                +","+new String(returnedMessage.getMessage().getBody()));
        //把失败的消息再次发送
        rabbitTemplate.convertAndSend(returnedMessage.getExchange(),
                                      returnedMessage.getRoutingKey(),returnedMessage.getMessage());
    }
​
    /**
     * @param correlationData :消息的唯一标识
     * @param ack :消息确认结果
     * @param cause :错误原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        log.info(correlationData.getId() +","+ack +","+cause);
    }
}
​

然后自定义template,把2个回调设置给template

​
//以下配置RabbitMQ消息服务
@Autowired
public ConnectionFactory connectionFactory;
​
@Autowired
private RabbitMQCallback rabbitMQCallback;
​
@Bean
public RabbitTemplate rabbitTemplate() {
    RabbitTemplate template = new RabbitTemplate(connectionFactory);
    // 这里的转换器设置实现了发送消息时自动序列化消息对象为message body
    template.setMandatory(true);
    template.setReturnsCallback(rabbitMQCallback);
    template.setConfirmCallback(rabbitMQCallback);
​
    return template;
}
可靠的消息投递方案
  1. 设计一个消息日志表,可以基于数据库,也可以基于Redis,字段有:交换机,路由key, 消息内容,发送状态(发送中,发送成功,发送失败),重试次数等。

  2. 在使用rabbitTemplate发布消息之前,把消息的内容持久化到 :消息日志表中,状态为:发送中

  3. 通过回调来监听消息发送结果,如果成功,把消息日志状态修改为:成功,如果发送失败,把消息日志修改为:失败

  4. 额外创建定时任务,定时读取失败的日志进行重试发送,并增加重试次数,直到发送成功,如果发送到一定次数依然不成功就不再重试,而是通知管理员抢修。

如何保证消息的可靠性?

消息从生产者到消费者的每一步都可能导致消息丢失:

1.发送消息时丢失:

生产者发送消息时连接MQ失败

生产者发送消息到达MQ后未找到Exchange

生产者发送消息到达MQ的Exchange后,未找到合适的Queue

消息到达MQ后,处理消息的进程发生异常

2.MQ导致消息丢失:

消息到达MQ,保存到队列后,尚未消费就突然宕机

3.消费者处理消息时:

消息接收后尚未处理突然宕机

消息接收后处理过程中抛出异常

综上,我们要解决消息丢失问题,保证MQ的可靠性,就必须从3个方面入手:

1.确保生产者一定把消息发送到MQ

2.确保MQ不会将消息弄丢

3.确保消费者一定要处理消息

生产者可靠性:

(一)生产者重试机制

为解决生产者发送消息时网络故障导致MQ连接出现问题。

通过配置文件开启连接超时重试机制。当连接MQ失败后就会重试,重新连接。

该重试机制是阻塞式重试,当网络波动的时候,会导致当前线程进行阻塞,会影响程序性能,因此我们一般不开启超时重试机制。如果一定要开启的话,可以使用一个单独的线程来进行消息的发送。这样就不会影响主线程的运行。

spring:
  rabbitmq:
    connection-timeout: 1s # 设置MQ的连接超时时间
    template:
      retry:
        enabled: true # 开启超时重试机制
        initial-interval: 1000ms # 失败后的初始等待时间
        multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
        max-attempts: 3 # 最大重试次数

(二)生产者确认机制

生产者发送消息到交换机以后,可能由于未找到正确的交换机或者是队列导致消息丢失,因此当消息正确且完整地完成发送以后,MQ需要向生产者返回一个回执,用来告诉生产者消息正确发送。

总结如下:

当消息投递到MQ,但是路由失败时,通过Publisher Return返回异常信息,同时返回ack的确认信息,代表投递成功

临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功

持久消息投递到了MQ,并且入队完成持久化,返回ACK ,告知投递成功

其它情况都会返回NACK,告知投递失败

其中ack和nack属于Publisher Confirm机制,ack是投递成功;nack是投递失败。而return则属于Publisher Return机制。默认两种机制都是关闭状态,需要通过配置文件来开启。

通过配置文件的方式开启生产者确认机制:

spring:
  rabbitmq:
    publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型
    publisher-returns: true # 开启publisher return机制

这里publisher-confirm-type有三种模式可选:

  • none:关闭confirm机制

  • simple:同步阻塞等待MQ的回执

  • correlated:MQ异步回调返回回执

由于消息到达交换机但是路由失败的时候也是路由成功,同时会发送一些异常信息,我们需要捕获这个异常信息,进行一些日志的记录

因此我们需要为rabbitTemplate定义returnsCallback来接受路由失败返回的异常信息

package com.itheima.publisher.config;
 
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Configuration;
 
import javax.annotation.PostConstruct;
 
@Slf4j
@AllArgsConstructor
@Configuration
public class MqConfig {
    private final RabbitTemplate rabbitTemplate;
 
    @PostConstruct
    public void init(){
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returned) {
                log.error("触发return callback,");
                log.debug("exchange: {}", returned.getExchange());
                log.debug("routingKey: {}", returned.getRoutingKey());
                log.debug("message: {}", returned.getMessage());
                log.debug("replyCode: {}", returned.getReplyCode());
                log.debug("replyText: {}", returned.getReplyText());
            }
        });
    }
}

生产者确认机制,在发送不同的消息的时候,我们还需要对MQ返回的结果进行处理,比如消息发送成功和发送失败分别需要做些什么事情。下面代码就是生产者对于消息发送失败成功的不同处理。

@Test
void testPublisherConfirm() {
    // 1.创建CorrelationData
    CorrelationData cd = new CorrelationData();
    // 2.给Future添加ConfirmCallback
    cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
        @Override
        public void onFailure(Throwable ex) {
            // 2.1.Future发生异常时的处理逻辑,基本不会触发
            log.error("send message fail", ex);
        }
        @Override
        public void onSuccess(CorrelationData.Confirm result) {
            // 2.2.Future接收到回执的处理逻辑,参数中的result就是回执内容
            if(result.isAck()){ // result.isAck(),boolean类型,true代表ack回执,false 代表 nack回执
                log.debug("发送消息成功,收到 ack!");
            }else{ // result.getReason(),String类型,返回nack时的异常描述
                log.error("发送消息失败,收到 nack, reason : {}", result.getReason());
            }
        }
    });
    // 3.发送消息
    rabbitTemplate.convertAndSend("hmall.direct", "q", "hello", cd);
}
MQ的可靠性:

消息到达MQ以后如果不能够及时地保存,那么也会导致消息丢失。默认情况下,消息是存储在MQ的内存中,MQ重启或者宕机会导致消息丢失。因此数据的持久化(将消息保存到硬盘中)就很有必要。

数据的持久化分为:

  • 交换机持久化

  • 队列持久化

  • 消息持久化

(一)交换机持久化

在创建交换机的时候通过durable来实现交换机的持久化。

@Configuration
public class RabbitMQConfig {
    
    @Bean
    public DirectExchange directExchange() {
        // 参数说明:交换机名称、是否持久化、是否自动删除
        return new DirectExchange("my.direct", true, false);
    }
    
    @Bean
    public FanoutExchange fanoutExchange() {
        return ExchangeBuilder.fanoutExchange("my.fanout")
                .durable(true)      // 设置持久化
                .build();
    }
    
    @Bean
    public TopicExchange topicExchange() {
        return ExchangeBuilder.topicExchange("my.topic")
                .durable(true)      // 设置持久化
                .build();
    }
}

(二)队列持久化

@Configuration
public class RabbitMQConfig {
    
    @Bean
    public Queue orderQueue() {
        // 方式一:直接创建
        return new Queue("order.queue", true);  // 第二个参数true表示持久化
        
        // 方式二:使用QueueBuilder
        return QueueBuilder.durable("order.queue")  // durable表示持久化
                .build();
    }
}

(三)消息持久化

一般通过消息的属性来设置消息的持久化

public void sendMessage(String message) {
        // 1. 设置消息属性
        MessageProperties properties = new MessageProperties();
        // 设置消息持久化
        properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
        // 设置消息ID
        properties.setMessageId(UUID.randomUUID().toString());
        // 设置消息优先级
        properties.setPriority(0);
        
        Message msg = new Message(message.getBytes(), properties);
        
        // 2. 添加发送确认回调
        CorrelationData correlationData = new CorrelationData();
        correlationData.getFuture().addCallback(new ListenableFutureCallback<>() {
            @Override
            public void onSuccess(CorrelationData.Confirm result) {
                if (result.isAck()) {
                    log.info("消息发送成功: {}", properties.getMessageId());
                } else {
                    log.error("消息发送失败: {}", result.getReason());
                    // 处理发送失败的消息
                    handleFailedMessage(msg);
                }
            }
            
            @Override
            public void onFailure(Throwable ex) {
                log.error("消息发送异常", ex);
            }
        });
        
        // 3. 发送消息
        rabbitTemplate.send("exchange.name", "routing.key", msg, correlationData);
    }

需要声明一下rabbitTemplate.send和rabbitTemplate.convertAndSend的区别:

1.消息转换

// convertAndSend:自动转换
rabbitTemplate.convertAndSend("", "queueName", "Hello"); // 直接发送字符串
​
// send:需要手动转换
MessageProperties properties = new MessageProperties();
Message msg = new Message("Hello".getBytes(), properties);
rabbitTemplate.send("exchangeName", "routingKey", msg);

2.消息属性设置

// convertAndSend:需要使用MessagePostProcessor
rabbitTemplate.convertAndSend("", "queueName", "Hello", message -> {
    message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
    return message;
});
​
// send:直接设置MessageProperties
MessageProperties properties = new MessageProperties();
properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
Message msg = new Message("Hello".getBytes(), properties);
rabbitTemplate.send("exchangeName", "routingKey", msg);

3.发送确认

// convertAndSend:需要额外设置回调
rabbitTemplate.convertAndSend("", "queueName", "Hello", new CorrelationData());
​
// send:直接支持回调
CorrelationData correlationData = new CorrelationData();
correlationData.getFuture().addCallback(new ListenableFutureCallback<>() {
    @Override
    public void onSuccess(CorrelationData.Confirm result) {
        if (result.isAck()) {
            log.info("消息发送成功");
        }
    }
    @Override
    public void onFailure(Throwable ex) {
        log.error("消息发送失败", ex);
    }
});
rabbitTemplate.send("exchange", "routingKey", msg, correlationData);

使用建议:

使用 convertAndSend 当:

简单的消息发送场景

不需要详细的消息属性设置

使用默认交换机直接发送到队列

使用 send 当:

需要完整控制消息属性

需要详细的发送确认回调

复杂的消息发送场景

需要消息追踪和监控

Lazy Queue说明

在默认情况下,RabbitMQ会将接收到的信息保存在内存中以降低消息收发的延迟。但在某些特殊情况下,这会导致消息积压,比如:

消费者宕机或出现网络故障

消息发送量激增,超过了消费者处理速度

消费者处理业务发生阻塞

一旦出现消息堆积问题,RabbitMQ的内存占用就会越来越高,直到触发内存预警上限。此时RabbitMQ会将内存消息刷到磁盘上,这个行为成为PageOut. PageOut会耗费一段时间,并且会阻塞队列进程。因此在这个过程中RabbitMQ不会再处理新的消息,生产者的所有请求都会被阻塞。

为了解决这个问题,从RabbitMQ的3.6.0版本开始,就增加了Lazy Queues的模式,也就是惰性队列。惰性队列的特征如下:

接收到消息后直接存入磁盘而非内存

消费者要消费消息时才会从磁盘中读取并加载到内存(也就是懒加载)

支持数百万条的消息存储

而在3.12版本之后,LazyQueue已经成为所有队列的默认格式。因此官方推荐升级MQ为3.12版本或者所有队列都设置为LazyQueue模式。

消费者可靠性

(一)消费者确认机制

由于消息回执的处理代码比较统一,因此SpringAMQP帮我们实现了消息确认。并允许我们通过配置文件设置ACK处理方式,有三种模式:

none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用

manual:手动模式。需要自己在业务代码中调用api,发送ack或reject,存在业务入侵,但更灵活

auto:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack. 当业务出现异常时,根据异常判断返回不同结果:

如果是业务异常,会自动返回nack;

如果是消息处理或校验异常,自动返回reject;

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto # 自动ack

(二)失败重试机制

如果消费者业务出现异常,并且一直无法得到解决,那么消息就会一直重新入队,并重新发送给消费者进行处理,会陷入一个无线循环导致MQ的压力过大。因此我们可以开启失败重试机制,让消费者业务出现异常的时候先在本地进行重试而不是立即重新入队。当重试达到最大次数以后就会返回reject抛弃消息。

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000ms # 初识的失败等待时长为1秒
          multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

(三)失败处理策略

在之前的测试中,本地测试达到最大重试次数后,消息会被丢弃。这在某些对于消息可靠性要求较高的业务场景下,显然不太合适了。因此Spring允许我们自定义重试次数耗尽后的消息处理策略,这个策略是由MessageRecovery接口来定义的,它有3个不同实现:

RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式

ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队

RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机 我们一般选择将失败的消息投递到一个指定的交换机,然后交由人工进行处理。

@Configuration
public class RabbitMQConfig {
    
    @Bean
    public DirectExchange errorExchange() {
        return new DirectExchange("error.exchange");
    }
    
    @Bean
    public Queue errorQueue() {
        return new Queue("error.queue");
    }
    
    @Bean
    public Binding errorBinding(Queue errorQueue, DirectExchange errorExchange) {
        return BindingBuilder.bind(errorQueue)
                .to(errorExchange)
                .with("error.routing.key");
    }
    
    @Bean
    public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {
        // 创建 RepublishMessageRecoverer
        return new RepublishMessageRecoverer(
            rabbitTemplate,           // RabbitTemplate
            "error.exchange",         // 错误交换机
            "error.routing.key"       // 错误路由键
        );
    }
}
业务幂等性

什么是业务幂等性?

有时候可能因为某些原因导致消息的重复消费,如果在消息重复消费的情况下,业务仍然和消费一次的情况是相同的,这就是业务的幂等性。

有些业务天然具备幂等性:比如,删除操作,查询操作。

不具备幂等性的业务:比如,扣减库存等。(重复消费消息会导致重复扣减库存)

解决方案:

(一)唯一消息id

每一条消息都生成一个唯一的id,与消息以同投递给消费者。消费者拿到id以后先去数据库中查询是否已经存在,如果不存在则处理消息,并将该消息id存入数据库,否则则将该消息进行废弃。

如何生成消息的唯一id呢?

@Bean
public MessageConverter messageConverter(){
    // 1.定义消息转换器
    Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
    // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
    jjmc.setCreateMessageIds(true);
    return jjmc;
}

定义消息转换器的同时为消息自动创建id。

什么是消息转换器呢?(进行序列化的一个工具)

消息其实就是字节数组,因为数据的传递是以二进制流的方式进行传递的因此我们需要将原始数据进行转换以后才能够正确发送

比如我们希望传递一个字符串或者对象作为消息,在传递消息的过程中是会将我们的原始消息转换成一个Message对象然后再进行传递(数据的传递应该是以二进制流的方式进行传递)。

(二)业务判断

比如我们的业务逻辑是修改订单状态,我们可以在修改订单状态之前判断该订单是否已经修改了状态。

补充知识:

rabbitmq中常用的消息转换器

// 1. SimpleMessageConverter (默认)
// 支持String、Serializable对象、byte[]的转换
@Bean
public MessageConverter simpleMessageConverter() {
    return new SimpleMessageConverter();
}
​
// 2. Jackson2JsonMessageConverter 
// 支持JSON格式的消息转换
@Bean
public MessageConverter jsonMessageConverter() {
    return new Jackson2JsonMessageConverter();
}
​
// 3. ContentTypeDelegatingMessageConverter
// 根据ContentType选择不同的转换器
@Bean
public MessageConverter delegatingMessageConverter() {
    ContentTypeDelegatingMessageConverter converter = new ContentTypeDelegatingMessageConverter();
    converter.addDelegate("application/json", new Jackson2JsonMessageConverter());
    converter.addDelegate("application/xml", new MarshallingMessageConverter());
    return converter;
}

第一种默认的转换器,如果我们需要转换对象的话那么我们对象需要继承serializable接口才能够正确进行消息的转换。

如果我们使用的是第二种支持JSON格式的消息转换器,那么对象无需继承serializable接口就可以进行消息转换。

其实消息转换就是序列化的过程(将原始数据变为二进制数据)

这就能联想到我们平时写controller接口的时候,后端向前端返回结果对象,我们是没有实现serializable接口的但是我们能够正常地进行数据传输。

这是因为我们HTTP响应默认是采用了json序列化方式,而我们SimpleMessageConverter则是采用了java的序列化方式。

兜底方案:

如果通过以上的预防措施,MQ消息的传递仍然出现问题,我们可以让消费者主动查询。

比如交易服务在等待支付服务的消息来修改支付订单的支付状态,但是支付服务发送消息失败了,那么我们可以让交易服务自己主动去查询判断是否完成了支付。那么我们什么时候需要去主动查询呢?我们可以创建一个定时任务,每隔几秒进行一次支付查询。

延迟消息

我们在下单后都会有一个30min的支付时间,如果超时未支付就会取消订单,我们要完成这个功能就需要延迟消息。

死信交换机

什么是死信?

当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):

  • 消费者使用basic.rejectbasic.nack声明消费失败,并且消息的requeue参数设置为false

  • 消息是一个过期消息,超时无人消费

  • 要投递的队列消息满了,无法投递

    简单来说就是这些消息没有作用了

死信交换机就会来负责处理这些死信的交换机。比如我们可以这样设计:

一个队列让它没有消费者进行监听,这样发送到该队列的消息也就无法进行处理,比如我们给消息设置过期时间为3min,这样该消息就一定会过期,3min后过期就会投递到死信交换机,然后分配给队列,消费者进行处理。这样就达到了延迟消息的效果

img

DelayExchange插件

通过下载这样一个插件来完成延迟消息,后续进行补充。

Logo

开源鸿蒙跨平台开发社区汇聚开发者与厂商,共建“一次开发,多端部署”的开源生态,致力于降低跨端开发门槛,推动万物智联创新。

更多推荐