技术漫谈RabbitMQ消息队列
FANSEARabbitMQ消息队列
rabbit教程
RabbitMq的应用架构

RabbitMq使用场景

- 应用解藕

- 异步提速
- 削峰填谷
快速开始
RabbitMq启动
1 2 3 4 5
| cd E:\APP\RabbitMq\rabbitmq_server-3.7.4\sbin
.\rabbitmq-plugins enable rabbitmq_management rabbitmq_delayed_message_exchange
.\rabbitmqctl start_app
|

注意:这里的routingkey有三种情况
- 生产者to消费者模式,中间没有交换机,routingKey为队列名称
- Work Queue工作队列模式,存在交换机并且指定消息分发模式(fanout),则routingKey不起作用,为空字符串,
- Routing路由工作模式(direct),存在交换机,交换机按照路由routingkey将消息分发到不同队列
1.发送消息

2.接收消息

RabbitMq消费模式
生产者to消费者模式

Work Queue工作队列模式
Pub/Sub工作模式(fanout)
直接绑定即可,无需指定路由
Routing路由工作模式(direct)
Topic通配符模式(topic)
**#**:代表0个或者多个字符串; *****:代表1个字符串;
Springboot整合RabbitMq
先提醒一个巨大的坑!:
如果你配置了交换机和队列但是没有显示!是因为:
SpringBoot总是在第一次对RabbitMq做操作时(发送消息,消费消息),才会将配置的交换机加载进去。
连接也是在这个时候才会创建!
- 引入依赖
1 2 3 4
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
|
- 配置基础信息(账号,密码)
1 2 3 4 5 6 7
| spring: rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest virtual-host: /
|
- 配置交换机,队列和他们的绑定关系
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| @Configuration @Slf4j public class RabbitMqConfig { public static final String EXCHANGE_NAME = "boot_topic_exchange"; public static final String QUEUE_NAME = "boot_queue";
@Bean("bootExchange") public Exchange bootExchange(){ log.info("bootExchange加载成功"); return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build(); } @Bean("bootQueue") public Queue bootQueue(){ log.info("bootQueue加载成功"); return QueueBuilder.durable(QUEUE_NAME).build(); } @Bean public Binding bootBinding(@Qualifier("bootExchange") Exchange exchange,@Qualifier("bootQueue") Queue queue){ log.info("绑定成功"); return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs(); }
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| @Configuration @Slf4j public class RabbitMqConfig { public static final String HOTEL_EXCHANGE = "hotel.topic"; public static final String HOTEL_INSERT_QUEUE = "hotel.insert.queue"; public static final String HOTEL_DELETE_QUEUE = "hotel.delete.queue"; public static final String HOTEL_INSERT_KEY = "hotel.insert"; public static final String HOTEL_DELETE_KEY = "hotel.delete"; @Bean public TopicExchange bootExchange(){ return new TopicExchange(HOTEL_EXCHANGE,true,false); } @Bean public Queue hotelInsertQueue(){ return new Queue(HOTEL_INSERT_QUEUE,true); } @Bean public Queue hotelDeleteQueue(){ return new Queue(HOTEL_DELETE_QUEUE,true); } @Bean public Binding insertBinding(){ log.info("insertBinding绑定成功"); return BindingBuilder.bind(hotelInsertQueue()).to(bootExchange()).with(HOTEL_INSERT_KEY); } @Bean public Binding deleteBinding(){ log.info("deleteBinding绑定成功"); return BindingBuilder.bind(hotelDeleteQueue()).to(bootExchange()).with(HOTEL_DELETE_KEY); } }
|
- 向队列发送消息
1 2 3 4
| @Test void producer() { rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_NAME,"boot.fansea","Hello,springboot!"); }
|
- 在队列读取消息
1 2 3 4 5 6 7 8 9 10
| @Component public class RabbitMqListener {
@RabbitListener(queues = "boot_queue") public void ListenQueue(Message message){ System.out.println(new String(message.getBody())); } }
|
生产方Confirm
- Confirm

- return


springBoot配置
1 2 3 4 5 6 7 8 9
| spring: rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest virtual-host: / publisher-confirms: true publisher-returns: true
|
消费方ACK
- 继承channelAwareMessageListener实现方法,获取channel参数
- 利用channel.basicAck()手动签收,channel.basicNack()在失败之后重新接受

消费端限流

1 2 3 4
| listener: direct: acknowledge-mode: manual prefetch: 1
|
设置过期时间
这里可以用于创建订单,订单设置过期时间
TTL:Time to live
- 设置整体队列消息的TTL
1 2 3 4 5
| @Bean("bootQueue") public Queue bootQueue() { log.info("bootQueue加载成功"); return QueueBuilder.durable(QUEUE_NAME).ttl(100000).build(); }
|
- 设置单条消息的ttl
1 2 3 4 5 6 7
| rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_NAME, "boot.fansea", "Hello,springboot!", new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setExpiration("5000"); return message; } });
|
这里消息比队列还提前过期了,不会立即移除,只会在他要被消费时,才会判断他是否过期,以提高效率
死信消息


延迟队列(TTL+死信队列)
延迟队列用于订单系统中,不需要轮询数据库,只需要对数据库进行一次操作,极大提高效率
高效率解决订单处理问题
- 创建普通队列和死信队列,相互绑定
- 生产者向普通队列发送订单消息
- 消费者读取死信队列的消息


1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
| @Configuration public class ExchangeQueueConfig {
public static final String X_EXCHANGE = "X"; public static final String QUEUE_A = "QA"; public static final String QUEUE_B = "QB"; public static final String Y_DEAD_LETTER_EXCHANGE = "Y"; public static final String DEAD_LETTER_QUEUE = "QD";
@Bean public DirectExchange xExchange(){ return new DirectExchange(X_EXCHANGE); }
@Bean public DirectExchange yExchange(){ return new DirectExchange(Y_DEAD_LETTER_EXCHANGE); }
@Bean("queueA") public Queue queueA(){ Map<String, Object> args = new HashMap<>(3); args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); args.put("x-dead-letter-routing-key", "YD"); args.put("x-message-ttl", 10000); return QueueBuilder.durable(QUEUE_A).withArguments(args).build(); }
@Bean public Binding queueaBindingX(@Qualifier("queueA") Queue queueA, @Qualifier("xExchange") DirectExchange xExchange){ return BindingBuilder.bind(queueA).to(xExchange).with("XA"); }
@Bean("queueB") public Queue queueB(){ Map<String, Object> args = new HashMap<>(3); args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); args.put("x-dead-letter-routing-key", "YD"); args.put("x-message-ttl", 40000); return QueueBuilder.durable(QUEUE_B).withArguments(args).build(); }
@Bean public Binding queuebBindingX(@Qualifier("queueB") Queue queue1B, @Qualifier("xExchange") DirectExchange xExchange){ return BindingBuilder.bind(queue1B).to(xExchange).with("XB"); }
@Bean("queueD") public Queue queueD(){ return new Queue(DEAD_LETTER_QUEUE); }
@Bean public Binding deadLetterBindingQAD(@Qualifier("queueD") Queue queueD, @Qualifier("yExchange") DirectExchange yExchange){ return BindingBuilder.bind(queueD).to(yExchange).with("YD"); }
}
|
延时队列插件

消息的可靠性保障——消息补偿
无非就是发送两次消息,进行比对,如果对应MDB中存在有数据并且比对成功延时消息,则说明百分百发送成功,其中延时发送消息不做业务操作,它里面最重要的参数就是消息ID。
如果延时发送消息和业务消息同时失败,第9步定时检查任务会比对Producer数据库与MDB数据库数量是否相等,如果不相等则重新调用producer发送未被消费的消息,直至消费成功为止。

消息的幂等性——乐观锁机制

发送消息带上版本号,操作数据库判断是否为对应版本以解决幂等性问题
思考题
与RocketMq的区别
消息模型:RocketMQ主要支持发布/订阅模式和点对点模式,而RabbitMQ支持更丰富的消息模型,包括Direct、Fanout、Topic、Headers等。
性能:RocketMQ在处理大规模消息吞吐量方面表现更好,适用于高并发场景;RabbitMQ虽然也能处理大量消息,但在性能上可能不如RocketMQ,RocketMQ则更注重顺序消息和事务消息的支持,适用于对消息顺序和一致性要求较高的场景