消息队列用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
在application.yml文件中配置web以及rabbitmq的配置信息
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: root
password: 9YoIu
# 发送者开启 return 确认机制
publisher-returns: true
# 发送者开启 confirm 确认机制
publisher-confirm-type: correlated
listener.simple:
# 设置消费端手动 ack
acknowledge-mode: manual
# 是否支持重试
retry:
enabled: true
在maven中引入依赖:
<!--MQ消息队列-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
使用@EnableRabbit注解在入口类上启用MQ
@SpringBootApplication
@EnableRabbit
public class SmsEmailApplication {
public static void main(String[] args) {
SpringApplication.run(SmsEmailApplication.class, args);
}
}
新建rabbitmq配置类
@Configuration
public class RabbitmqConfig {
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
//配置正常业务 例如邮件发送 的队列交换机与通道
public static String Queue1 = "queue_1";
public static String Exchange1 = "exchange_1";
public static String Routing1 = "routing_key_1";
/**
* 定义死信队列相关信息
*/
public final static String deadQueueName = "dead_queue";
public final static String deadRoutingKey = "dead_routing_key";
public final static String deadExchangeName = "dead_exchange";
/**
* 死信队列 交换机标识符
*/
public static final String DEAD_LETTER_QUEUE_KEY = "x-dead-letter-exchange";
/**
* 死信队列交换机绑定键标识符
*/
public static final String DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key";
/**
* 创建死信交换机
*/
@Bean
public DirectExchange deadExchange() {
return new DirectExchange(deadExchangeName);
}
/**
* 创建配置死信队列
*
* @return
*/
@Bean
public Queue deadQueue() {
Queue queue = new Queue(deadQueueName, true);
return queue;
}
/**
* 死信队列与死信交换机绑定
*/
@Bean
public Binding bindingDeadExchange() {
return BindingBuilder.bind(deadQueue()).to(deadExchange()).with(deadRoutingKey);
}
/**
* 队列绑定到死信
* 第一个参数是创建的queue的名字,第二个参数是是否支持持久化
* @return
*/
@Bean
public Queue EmailQueue() {
// 将普通队列绑定到死信队列交换机上
Map<String, Object> args = new HashMap<>(2);
args.put(DEAD_LETTER_QUEUE_KEY, deadExchangeName);
args.put(DEAD_LETTER_ROUTING_KEY, deadRoutingKey);
Queue queue = new Queue(Queue1, true, false, false, args);
return queue;
}
/**
* 创建交换机
* 一共有三种构造方法,可以只传exchange的名字, 第二种,可以传exchange名字,是否支持持久化,是否可以自动删除,
* 第三种在第二种参数上可以增加Map,Map中可以存放自定义exchange中的参数
* @return
*/
@Bean
public DirectExchange EmailExchange() {
return new DirectExchange(Exchange1, true, false);
}
/**
* 绑定
* @param Queue
* @param Exchange
* @return
*/
@Bean
public Binding bindingFinanceExchange(Queue Queue1, DirectExchange Exchange) {
return BindingBuilder.bind(EmailQueue()).to(Exchange1).with(Routing);
}
}
新建消费者类RabbitReceiver
/** 消息队列监听器
*
* @param message
*/
@RabbitListener(queues = "queue_1")
@RabbitHandler
public void process(JSONObject data, Channel channel, Message message) throws IOException {
//消息手动确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
//消息重新投递
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
//消息是否重复
if(message.getMessageProperties().getRedelivered()){
//拒绝消息
channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
//拒绝消费消息(丢失消息) 重新投递给死信队列
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
}
}
/**
* 死信 消息队列消费者
* 当无法消费且被投递至死信队列则再次被死信消费
* @param message
* @param headers
* @param channel
* @throws Exception
*/
@RabbitListener(queues = "dead_queue")
@RabbitHandler
public void deadProcess(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception {
// 获取消息Id
String messageId = message.getMessageProperties().getMessageId();
String msg = new String(message.getBody(), "UTF-8");
// // 手动ack
Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
// 手动签收
channel.basicAck(deliveryTag, false);
log.error("日志已记录....");
}
投递一条消息
@Resource
RabbitTemplate rabbitTemplate;
//生成一个随机消息ID
public static Message getId (JSONObject data){
return MessageBuilder.withBody(JSON.toJSONString(data).getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_JSON)
.setContentEncoding("utf-8")
.setMessageId(UUID.randomUUID()+"")
.build();
}
/**
* 投递消息
* 通过自定义设置消息体给每一个消息设置一个唯一ID,方便失败时候重试
* @param sendData 要发送的数据
* @param async 如果开启此项,交换机会马上把所有的信息都交给所有的消费者,消费者再自行处理,不会因为消费者处理慢而阻塞线程。 返回一个null
* 如果禁用此项,交换机会同步消费者。使用此方法,当确认了所有的消费者都接收成功之后,才接收另外一个 会造成阻塞 返回一个object
* rabbitTemplate.convertAndSend 里接受3个参数 第一个是之前配置的交换机,第二个是配置好的routing_key,第三个是消息体
*/
public Object sendSmsEmail(JSONObject sendData,boolean async){
//自定义消息体 必须给消息指定一个UUID,用于失败重试
Message message = getId(sendData);
if (async){
rabbitTemplate.convertAndSend("exchange_1","routing_key_1",message);
return null;
}else{
return rabbitTemplate.convertSendAndReceive("exchange_1","routing_key_1",message);
}
}
现在我们在浏览器中输入:http://localhost:15672 可以看到一个登录界面
查看队列,features行下的普通交换机有一个 DLX 的标志,就说明已绑定了死信交换机