技术漫谈RabbitMQ消息队列
FANSEARabbitMQ消息队列
rabbit教程
RabbitMq的应用架构

RabbitMq使用场景

- 应用解藕

- 异步提速
 
- 削峰填谷
 
快速开始
RabbitMq启动
| 12
 3
 4
 5
 
 | cd E:\APP\RabbitMq\rabbitmq_server-3.7.4\sbin
 .\rabbitmq-plugins enable rabbitmq_management rabbitmq_delayed_message_exchange
 
 .\rabbitmqctl start_app
 
 | 

注意:这里的routingkey有三种情况
- 生产者to消费者模式,中间没有交换机,routingKey为队列名称
- Work Queue工作队列模式,存在交换机并且指定消息分发模式(fanout),则routingKey不起作用,为空字符串,
- Routing路由工作模式(direct),存在交换机,交换机按照路由routingkey将消息分发到不同队列
1.发送消息

2.接收消息

RabbitMq消费模式
- 生产者to消费者模式

- Work Queue工作队列模式
 
- Pub/Sub工作模式(fanout)
直接绑定即可,无需指定路由
 
- Routing路由工作模式(direct)
 
- Topic通配符模式(topic)
 
**#**:代表0个或者多个字符串;     *****:代表1个字符串;
Springboot整合RabbitMq
先提醒一个巨大的坑!:
如果你配置了交换机和队列但是没有显示!是因为:
SpringBoot总是在第一次对RabbitMq做操作时(发送消息,消费消息),才会将配置的交换机加载进去。
连接也是在这个时候才会创建!
- 引入依赖
| 12
 3
 4
 
 | <dependency><groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-amqp</artifactId>
 </dependency>
 
 | 
- 配置基础信息(账号,密码)
| 12
 3
 4
 5
 6
 7
 
 | spring:rabbitmq:
 host: 127.0.0.1
 port: 5672
 username: guest
 password: guest
 virtual-host: /
 
 | 
- 配置交换机,队列和他们的绑定关系
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 
 | @Configuration@Slf4j
 public class RabbitMqConfig {
 public static final String EXCHANGE_NAME = "boot_topic_exchange";
 public static final String QUEUE_NAME = "boot_queue";
 
 @Bean("bootExchange")
 public Exchange bootExchange(){
 log.info("bootExchange加载成功");
 return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
 }
 @Bean("bootQueue")
 public Queue bootQueue(){
 log.info("bootQueue加载成功");
 return QueueBuilder.durable(QUEUE_NAME).build();
 }
 @Bean
 public Binding bootBinding(@Qualifier("bootExchange") Exchange exchange,@Qualifier("bootQueue") Queue queue){
 log.info("绑定成功");
 return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
 }
 
 
 }
 
 
 
 | 
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 
 | @Configuration@Slf4j
 public class RabbitMqConfig {
 public static final String HOTEL_EXCHANGE = "hotel.topic";
 public static final String HOTEL_INSERT_QUEUE = "hotel.insert.queue";
 public static final String HOTEL_DELETE_QUEUE = "hotel.delete.queue";
 public static final String HOTEL_INSERT_KEY = "hotel.insert";
 public static final String HOTEL_DELETE_KEY = "hotel.delete";
 @Bean
 public TopicExchange bootExchange(){
 return new TopicExchange(HOTEL_EXCHANGE,true,false);
 }
 @Bean
 public Queue hotelInsertQueue(){
 return new Queue(HOTEL_INSERT_QUEUE,true);
 }
 @Bean
 public Queue hotelDeleteQueue(){
 return new Queue(HOTEL_DELETE_QUEUE,true);
 }
 @Bean
 public Binding insertBinding(){
 log.info("insertBinding绑定成功");
 return BindingBuilder.bind(hotelInsertQueue()).to(bootExchange()).with(HOTEL_INSERT_KEY);
 }
 @Bean
 public Binding deleteBinding(){
 log.info("deleteBinding绑定成功");
 return BindingBuilder.bind(hotelDeleteQueue()).to(bootExchange()).with(HOTEL_DELETE_KEY);
 }
 }
 
 | 
- 向队列发送消息
| 12
 3
 4
 
 | @Testvoid producer() {
 rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_NAME,"boot.fansea","Hello,springboot!");
 }
 
 | 
- 在队列读取消息
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 
 | @Componentpublic class RabbitMqListener {
 
 @RabbitListener(queues = "boot_queue")
 public void ListenQueue(Message message){
 System.out.println(new String(message.getBody()));
 }
 }
 
 
 
 | 
生产方Confirm
- Confirm

- return


springBoot配置
| 12
 3
 4
 5
 6
 7
 8
 9
 
 | spring:rabbitmq:
 host: 127.0.0.1
 port: 5672
 username: guest
 password: guest
 virtual-host: /
 publisher-confirms: true
 publisher-returns: true
 
 | 
消费方ACK
- 继承channelAwareMessageListener实现方法,获取channel参数
- 利用channel.basicAck()手动签收,channel.basicNack()在失败之后重新接受

消费端限流

| 12
 3
 4
 
 | listener:direct:
 acknowledge-mode: manual
 prefetch: 1
 
 | 
设置过期时间
这里可以用于创建订单,订单设置过期时间
TTL:Time to live
- 设置整体队列消息的TTL
| 12
 3
 4
 5
 
 | @Bean("bootQueue")public Queue bootQueue() {
 log.info("bootQueue加载成功");
 return QueueBuilder.durable(QUEUE_NAME).ttl(100000).build();
 }
 
 | 
- 设置单条消息的ttl
| 12
 3
 4
 5
 6
 7
 
 | rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_NAME, "boot.fansea", "Hello,springboot!", new MessagePostProcessor() {@Override
 public Message postProcessMessage(Message message) throws AmqpException {
 message.getMessageProperties().setExpiration("5000");
 return message;
 }
 });
 
 | 
这里消息比队列还提前过期了,不会立即移除,只会在他要被消费时,才会判断他是否过期,以提高效率
死信消息


延迟队列(TTL+死信队列)
延迟队列用于订单系统中,不需要轮询数据库,只需要对数据库进行一次操作,极大提高效率
高效率解决订单处理问题
- 创建普通队列和死信队列,相互绑定
- 生产者向普通队列发送订单消息
- 消费者读取死信队列的消息


| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 
 | @Configuration
 public class ExchangeQueueConfig {
 
 public static final String X_EXCHANGE = "X";
 public static final String QUEUE_A = "QA";
 public static final String QUEUE_B = "QB";
 public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
 public static final String DEAD_LETTER_QUEUE = "QD";
 
 
 @Bean
 public DirectExchange xExchange(){
 return new DirectExchange(X_EXCHANGE);
 }
 
 
 @Bean
 public DirectExchange yExchange(){
 return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
 }
 
 
 @Bean("queueA")
 public Queue queueA(){
 Map<String, Object> args = new HashMap<>(3);
 args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
 args.put("x-dead-letter-routing-key", "YD");
 args.put("x-message-ttl", 10000);
 return QueueBuilder.durable(QUEUE_A).withArguments(args).build();
 }
 
 
 
 @Bean
 public Binding queueaBindingX(@Qualifier("queueA") Queue queueA,
 @Qualifier("xExchange") DirectExchange xExchange){
 return BindingBuilder.bind(queueA).to(xExchange).with("XA");
 }
 
 
 
 @Bean("queueB")
 public Queue queueB(){
 Map<String, Object> args = new HashMap<>(3);
 args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
 args.put("x-dead-letter-routing-key", "YD");
 args.put("x-message-ttl", 40000);
 return QueueBuilder.durable(QUEUE_B).withArguments(args).build();
 }
 
 
 @Bean
 public Binding queuebBindingX(@Qualifier("queueB") Queue queue1B,
 @Qualifier("xExchange") DirectExchange xExchange){
 return BindingBuilder.bind(queue1B).to(xExchange).with("XB");
 }
 
 
 @Bean("queueD")
 public Queue queueD(){
 return new Queue(DEAD_LETTER_QUEUE);
 }
 
 
 @Bean
 public Binding deadLetterBindingQAD(@Qualifier("queueD") Queue queueD,
 @Qualifier("yExchange") DirectExchange yExchange){
 return BindingBuilder.bind(queueD).to(yExchange).with("YD");
 }
 
 
 }
 
 | 
延时队列插件

消息的可靠性保障——消息补偿
无非就是发送两次消息,进行比对,如果对应MDB中存在有数据并且比对成功延时消息,则说明百分百发送成功,其中延时发送消息不做业务操作,它里面最重要的参数就是消息ID。
如果延时发送消息和业务消息同时失败,第9步定时检查任务会比对Producer数据库与MDB数据库数量是否相等,如果不相等则重新调用producer发送未被消费的消息,直至消费成功为止。

消息的幂等性——乐观锁机制

发送消息带上版本号,操作数据库判断是否为对应版本以解决幂等性问题
思考题
与RocketMq的区别
- 消息模型:RocketMQ主要支持发布/订阅模式和点对点模式,而RabbitMQ支持更丰富的消息模型,包括Direct、Fanout、Topic、Headers等。 
- 性能:RocketMQ在处理大规模消息吞吐量方面表现更好,适用于高并发场景;RabbitMQ虽然也能处理大量消息,但在性能上可能不如RocketMQ,RocketMQ则更注重顺序消息和事务消息的支持,适用于对消息顺序和一致性要求较高的场景