RabbitMQ消息队列

RabbitMQ消息队列

rabbit教程

RabbitMq的应用架构

image-20230928153924426

RabbitMq使用场景

image-20230925182010047

  1. 应用解藕

image-20230925182143263

  1. 异步提速
image-20230925182300098
  1. 削峰填谷
image-20230925182347965

快速开始

RabbitMq启动

1
2
3
4
5
cd E:\APP\RabbitMq\rabbitmq_server-3.7.4\sbin

.\rabbitmq-plugins enable rabbitmq_management rabbitmq_delayed_message_exchange

.\rabbitmqctl start_app

image-20230926150604350

注意:这里的routingkey有三种情况

  1. 生产者to消费者模式,中间没有交换机,routingKey为队列名称
  2. Work Queue工作队列模式,存在交换机并且指定消息分发模式(fanout),则routingKey不起作用,为空字符串,
  3. Routing路由工作模式(direct),存在交换机,交换机按照路由routingkey将消息分发到不同队列

1.发送消息

image-20230926151430172

2.接收消息

image-20230926153049867

RabbitMq消费模式

  1. 生产者to消费者模式

image-20230928154039058

  1. Work Queue工作队列模式

image-20230928154101895
  1. Pub/Sub工作模式(fanout)

直接绑定即可,无需指定路由

image-20230928154015561
  1. Routing路由工作模式(direct)

image-20230928154153002
  1. Topic通配符模式(topic)

image-20230926214529971

**#**:代表0个或者多个字符串; *****:代表1个字符串;

Springboot整合RabbitMq

先提醒一个巨大的坑!:

如果你配置了交换机和队列但是没有显示!是因为:

SpringBoot总是在第一次对RabbitMq做操作时(发送消息,消费消息),才会将配置的交换机加载进去。

连接也是在这个时候才会创建!

  1. 引入依赖
1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  1. 配置基础信息(账号,密码)
1
2
3
4
5
6
7
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtual-host: /
  1. 配置交换机,队列和他们的绑定关系
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Configuration
@Slf4j
public class RabbitMqConfig {
public static final String EXCHANGE_NAME = "boot_topic_exchange";
public static final String QUEUE_NAME = "boot_queue";

@Bean("bootExchange")
public Exchange bootExchange(){
log.info("bootExchange加载成功");
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
}
@Bean("bootQueue")
public Queue bootQueue(){
log.info("bootQueue加载成功");
return QueueBuilder.durable(QUEUE_NAME).build();
}
@Bean
public Binding bootBinding(@Qualifier("bootExchange") Exchange exchange,@Qualifier("bootQueue") Queue queue){
log.info("绑定成功");
return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
}


}

//durable 队列是否持久化,false:队列在内存中,服务器挂掉后,队列就没了;true:服务器重启后,队列将会重新生成.注意:只是队列持久化,不代表队列中的消息持久化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Configuration
@Slf4j
public class RabbitMqConfig {
public static final String HOTEL_EXCHANGE = "hotel.topic";
public static final String HOTEL_INSERT_QUEUE = "hotel.insert.queue";
public static final String HOTEL_DELETE_QUEUE = "hotel.delete.queue";
public static final String HOTEL_INSERT_KEY = "hotel.insert";
public static final String HOTEL_DELETE_KEY = "hotel.delete";
@Bean
public TopicExchange bootExchange(){
return new TopicExchange(HOTEL_EXCHANGE,true,false);
}
@Bean
public Queue hotelInsertQueue(){
return new Queue(HOTEL_INSERT_QUEUE,true);
}
@Bean
public Queue hotelDeleteQueue(){
return new Queue(HOTEL_DELETE_QUEUE,true);
}
@Bean
public Binding insertBinding(){
log.info("insertBinding绑定成功");
return BindingBuilder.bind(hotelInsertQueue()).to(bootExchange()).with(HOTEL_INSERT_KEY);
}
@Bean
public Binding deleteBinding(){
log.info("deleteBinding绑定成功");
return BindingBuilder.bind(hotelDeleteQueue()).to(bootExchange()).with(HOTEL_DELETE_KEY);
}
}
  1. 向队列发送消息
1
2
3
4
@Test
void producer() {
rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_NAME,"boot.fansea","Hello,springboot!");
}
  1. 在队列读取消息
1
2
3
4
5
6
7
8
9
10
@Component
public class RabbitMqListener {

@RabbitListener(queues = "boot_queue")
public void ListenQueue(Message message){
System.out.println(new String(message.getBody()));
}
}


生产方Confirm

  1. Confirm

image-20231010152336777

  1. return

image-20231010152417650

image-20231009213745543

springBoot配置

1
2
3
4
5
6
7
8
9
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtual-host: /
publisher-confirms: true
publisher-returns: true

消费方ACK

  1. 继承channelAwareMessageListener实现方法,获取channel参数
  2. 利用channel.basicAck()手动签收,channel.basicNack()在失败之后重新接受

image-20231010150849615

消费端限流

image-20231010153647692

1
2
3
4
listener:
direct:
acknowledge-mode: manual
prefetch: 1

设置过期时间

这里可以用于创建订单,订单设置过期时间

TTL:Time to live

  1. 设置整体队列消息的TTL
1
2
3
4
5
@Bean("bootQueue")
public Queue bootQueue() {
log.info("bootQueue加载成功");
return QueueBuilder.durable(QUEUE_NAME).ttl(100000).build();
}
  1. 设置单条消息的ttl
1
2
3
4
5
6
7
rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_NAME, "boot.fansea", "Hello,springboot!", new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration("5000");
return message;
}
});

这里消息比队列还提前过期了,不会立即移除,只会在他要被消费时,才会判断他是否过期,以提高效率

死信消息

image-20231010165718486

image-20231010170441496

延迟队列(TTL+死信队列)

延迟队列用于订单系统中,不需要轮询数据库,只需要对数据库进行一次操作,极大提高效率

高效率解决订单处理问题

  1. 创建普通队列和死信队列,相互绑定
  2. 生产者向普通队列发送订单消息
  3. 消费者读取死信队列的消息

image-20231010191719520

image-20231010192012231

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
/* RabbitMQ的交换机、队列配置文件 */
@Configuration
public class ExchangeQueueConfig {

public static final String X_EXCHANGE = "X";
public static final String QUEUE_A = "QA";
public static final String QUEUE_B = "QB";
public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
public static final String DEAD_LETTER_QUEUE = "QD";

/*创建X交换机*/
@Bean
public DirectExchange xExchange(){
return new DirectExchange(X_EXCHANGE);
}

/*创建死信交换机*/
@Bean
public DirectExchange yExchange(){
return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
}

//声明队列 A ttl 为 10s 并绑定到对应的死信交换机
@Bean("queueA")
public Queue queueA(){
Map<String, Object> args = new HashMap<>(3);
args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); //声明当前队列绑定的死信交换机
args.put("x-dead-letter-routing-key", "YD"); //声明当前队列的死信路由 key
args.put("x-message-ttl", 10000); //声明队列的 TTL
return QueueBuilder.durable(QUEUE_A).withArguments(args).build();
}


// 声明队列 A 绑定 X 交换机
@Bean
public Binding queueaBindingX(@Qualifier("queueA") Queue queueA,
@Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(queueA).to(xExchange).with("XA");
}


//声明队列 B ttl 为 40s 并绑定到对应的死信交换机
@Bean("queueB")
public Queue queueB(){
Map<String, Object> args = new HashMap<>(3);
args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); //声明当前队列绑定的死信交换机
args.put("x-dead-letter-routing-key", "YD"); //声明当前队列的死信路由 key
args.put("x-message-ttl", 40000); //声明队列的 TTL
return QueueBuilder.durable(QUEUE_B).withArguments(args).build();
}

//声明队列 B 绑定 X 交换机
@Bean
public Binding queuebBindingX(@Qualifier("queueB") Queue queue1B,
@Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(queue1B).to(xExchange).with("XB");
}

//声明死信队列 QD
@Bean("queueD")
public Queue queueD(){
return new Queue(DEAD_LETTER_QUEUE);
}

//声明死信队列 QD 绑定关系
@Bean
public Binding deadLetterBindingQAD(@Qualifier("queueD") Queue queueD,
@Qualifier("yExchange") DirectExchange yExchange){
return BindingBuilder.bind(queueD).to(yExchange).with("YD");
}


}

延时队列插件

image-20231106092946864

消息的可靠性保障——消息补偿

无非就是发送两次消息,进行比对,如果对应MDB中存在有数据并且比对成功延时消息,则说明百分百发送成功,其中延时发送消息不做业务操作,它里面最重要的参数就是消息ID。

如果延时发送消息和业务消息同时失败,第9步定时检查任务会比对Producer数据库与MDB数据库数量是否相等,如果不相等则重新调用producer发送未被消费的消息,直至消费成功为止。

image-20231010205253200

消息的幂等性——乐观锁机制

image-20231010211156807

发送消息带上版本号,操作数据库判断是否为对应版本以解决幂等性问题

思考题

与RocketMq的区别

  • 消息模型:RocketMQ主要支持发布/订阅模式和点对点模式,而RabbitMQ支持更丰富的消息模型,包括Direct、Fanout、Topic、Headers等。

  • 性能:RocketMQ在处理大规模消息吞吐量方面表现更好,适用于高并发场景;RabbitMQ虽然也能处理大量消息,但在性能上可能不如RocketMQ,RocketMQ则更注重顺序消息和事务消息的支持,适用于对消息顺序和一致性要求较高的场景

RocketMq与Kafaka的区别