Springboot rabbitMQ消息队列配置手动确认机制 配置消费者与生产者和死信交换机

消息队列用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。 在application.yml文件中配置web以及rabbitmq的配置信息 spring: rabbitmq: host: 127.0.0.1 port: 5672 usern

消息队列用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

在application.yml文件中配置web以及rabbitmq的配置信息

spring:
   rabbitmq:
     host: 127.0.0.1
     port: 5672
     username: root
     password: 9YoIu
     # 发送者开启 return 确认机制
     publisher-returns: true
     # 发送者开启 confirm 确认机制
     publisher-confirm-type: correlated
     listener.simple:
       # 设置消费端手动 ack
       acknowledge-mode: manual
       # 是否支持重试
       retry:
        enabled: true

在maven中引入依赖:

<!--MQ消息队列-->
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

使用@EnableRabbit注解在入口类上启用MQ

@SpringBootApplication
@EnableRabbit
public class SmsEmailApplication {
    public static void main(String[] args) {
        SpringApplication.run(SmsEmailApplication.class, args);
    }
}

新建rabbitmq配置类

@Configuration
public class RabbitmqConfig {

  @Bean
  public MessageConverter messageConverter() {
      return new Jackson2JsonMessageConverter();
  }

  //配置正常业务 例如邮件发送  的队列交换机与通道
  public static String Queue1 = "queue_1";
  public static String Exchange1 = "exchange_1";
  public static String Routing1 = "routing_key_1";

  /**
   * 定义死信队列相关信息
   */
  public final static String deadQueueName = "dead_queue";
  public final static String deadRoutingKey = "dead_routing_key";
  public final static String deadExchangeName = "dead_exchange";

  /**
   * 死信队列 交换机标识符
   */
  public static final String DEAD_LETTER_QUEUE_KEY = "x-dead-letter-exchange";
  /**
   * 死信队列交换机绑定键标识符
   */
  public static final String DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key";

  /**
   * 创建死信交换机
   */
  @Bean
  public DirectExchange deadExchange() {
      return new DirectExchange(deadExchangeName);
  }

  /**
   * 创建配置死信队列
   *
   * @return
   */
  @Bean
  public Queue deadQueue() {
      Queue queue = new Queue(deadQueueName, true);
      return queue;
  }

  /**
   * 死信队列与死信交换机绑定
   */
  @Bean
  public Binding bindingDeadExchange() {
      return BindingBuilder.bind(deadQueue()).to(deadExchange()).with(deadRoutingKey);
  }

  /**
   * 队列绑定到死信
   * 第一个参数是创建的queue的名字,第二个参数是是否支持持久化
   * @return
   */
  @Bean
  public Queue EmailQueue() {
      // 将普通队列绑定到死信队列交换机上
      Map<String, Object> args = new HashMap<>(2);
      args.put(DEAD_LETTER_QUEUE_KEY, deadExchangeName);
      args.put(DEAD_LETTER_ROUTING_KEY, deadRoutingKey);
      Queue queue = new Queue(Queue1, true, false, false, args);
      return queue;
  }

   /**
   * 创建交换机
   *  一共有三种构造方法,可以只传exchange的名字, 第二种,可以传exchange名字,是否支持持久化,是否可以自动删除,
   *  第三种在第二种参数上可以增加Map,Map中可以存放自定义exchange中的参数
   * @return
   */
  @Bean
  public DirectExchange EmailExchange() {
      return new DirectExchange(Exchange1, true, false);
  }

   /**
   * 绑定
   * @param Queue
   * @param Exchange
   * @return
   */
  @Bean
  public Binding bindingFinanceExchange(Queue Queue1, DirectExchange Exchange) {
      return BindingBuilder.bind(EmailQueue()).to(Exchange1).with(Routing);
  }
}

新建消费者类RabbitReceiver

/** 消息队列监听器
 * 
 * @param message
 */
@RabbitListener(queues = "queue_1")
@RabbitHandler
public void process(JSONObject data, Channel channel, Message message) throws IOException {
          //消息手动确认
          channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
          //消息重新投递
          channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
            //消息是否重复
          if(message.getMessageProperties().getRedelivered()){
             //拒绝消息
             channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
             //拒绝消费消息(丢失消息) 重新投递给死信队列
             channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
           }
}

/**
 * 死信 消息队列消费者
 * 当无法消费且被投递至死信队列则再次被死信消费
 * @param message
 * @param headers
 * @param channel
 * @throws Exception
 */
@RabbitListener(queues = "dead_queue")
@RabbitHandler
public void deadProcess(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception {
    // 获取消息Id
    String messageId = message.getMessageProperties().getMessageId();
    String msg = new String(message.getBody(), "UTF-8");
    // // 手动ack 
    Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
    // 手动签收
    channel.basicAck(deliveryTag, false);
    log.error("日志已记录....");
}

投递一条消息

@Resource
   RabbitTemplate rabbitTemplate;

//生成一个随机消息ID
public static Message getId (JSONObject data){
    return MessageBuilder.withBody(JSON.toJSONString(data).getBytes())
            .setContentType(MessageProperties.CONTENT_TYPE_JSON)
            .setContentEncoding("utf-8")
            .setMessageId(UUID.randomUUID()+"")
            .build();
}
    /**
 * 投递消息
 *  通过自定义设置消息体给每一个消息设置一个唯一ID,方便失败时候重试
 * @param sendData 要发送的数据
 * @param async 如果开启此项,交换机会马上把所有的信息都交给所有的消费者,消费者再自行处理,不会因为消费者处理慢而阻塞线程。 返回一个null
 *                如果禁用此项,交换机会同步消费者。使用此方法,当确认了所有的消费者都接收成功之后,才接收另外一个 会造成阻塞 返回一个object
 * rabbitTemplate.convertAndSend 里接受3个参数 第一个是之前配置的交换机,第二个是配置好的routing_key,第三个是消息体
 */
public Object sendSmsEmail(JSONObject sendData,boolean async){
    //自定义消息体 必须给消息指定一个UUID,用于失败重试
    Message message = getId(sendData);
    if (async){
        rabbitTemplate.convertAndSend("exchange_1","routing_key_1",message);
        return null;
    }else{
        return rabbitTemplate.convertSendAndReceive("exchange_1","routing_key_1",message);
    }

}

现在我们在浏览器中输入:http://localhost:15672 可以看到一个登录界面
查看队列,features行下的普通交换机有一个 DLX 的标志,就说明已绑定了死信交换机

LICENSED UNDER CC BY-NC-SA 4.0
Comment