RocketMq

RocketMq

RocketMq主要采用发布订阅的方式,发布订阅又有哪些有点呢:解耦、异步、可伸缩、负载均衡

  • 解耦:发布者和订阅者之间没有直接依赖关系
  • 异步:增快响应速度
  • 可伸缩:根据需要动态地添加或删除订阅者
  • 负载均衡:将消息均匀地分配给多个消费者,避免某个消费者过载

快速开始

本地部署 RocketMQ | RocketMQ (apache.org)

常用部署组件

2.1 Broker

Broker是RocketMQ的核心,大部分工作都在Broker中完成,包括接收请求处理消费消费持久消息的HA,以及服务端过滤等都在里面完成

就相当于这个Rocket服务本身:包括了主题,消息队列

Broker 是 RocketMQ 的核心服务节点,负责接收来自 Producer(生产者)的消息、存储消息以及将消息转发给 Consumer(消费者)。在 RocketMQ 架构中,Broker 分为 Master Broker 和 Slave Broker(或称为 Replica)两种角色:

  • Master Broker:处理消息的读写请求,是生产者发送消息和消费者消费消息的主要目标。
  • Slave Broker:作为 Master Broker 的副本存在,主要用于备读或者故障切换,以提高系统的可用性。在 Master Broker出故障时,Slave Broker 里的消息依然可以被消费,或者直接被提升为新的 Master 继续提供服务。

Broker 的部署可以是单节点的,也可以是集群式的,甚至支持同机器、同进程混布的模式。
其具体部署形式可以参考 Broker 部署指南

2.2 NameServer

相当于Nacos,用于注册发现服务,维护Broker路由,监测服务状态

NameServer 是 RocketMQ 的路由服务集群,它不保存任何消息数据,而是维护着所有 Broker 的路由信息(包括 Broker 存活情况、各 topic 的路由信息等)。Producer 和 Consumer 在发送或订阅消息前,都需要先从 NameServer 获取 Broker 的路由信息,之后才能正确地与 Broker 通信。NameServer 支持集群部署,以确保高可用。

该组件的部署相对简单,可以参见文档 NameServer 部署指南

2.3 Proxy

Proxy 是 RocketMQ 为了提高性能和简化客户端接入而引入的一个可选组件。它作为一个轻量级的代理服务器,位于客户端与 Broker 之间,主要职责包括:

  • 负载均衡:自动为客户端分配合适的 Broker,减轻客户端的负担。
  • 协议转换:支持多种协议接入,使得不同语言的客户端能够更容易地与 RocketMQ 集成。
  • 安全控制:可以作为一层安全网关,实现访问控制、鉴权等功能。
  • 网络优化:通过缓存、连接复用等技术优化网络通信效率。

该组件的部署相对独立,可以选用不同模式进行部署,如 Local 模式以及 Cluster 模式,你可以参考Proxy 部署指南进行部署。

2.4 Controller

Controller是 RocketMQ 5.0 新增的组件,它充当控制平面的角色,负责管理和协调系统的整体状态。它主要出现在可切换架构的部署过程中,如 Broker 宕机时,它将进行调度,对 Broker 集群进行选举管理。Controller 本身也支持集群部署,基于 Raft 实现容灾。

该组件的部署可以参考Broker 部署指南中的主备自动切换模式部署指南章节。

2.5 Dashboard

RocketMQ Dashboard 是 RocketMQ 提供的一款可视化管理界面,它允许用户通过网页浏览器直观地监控和管理 RocketMQ 集群的状态,包括但不限于查看消息队列、消费进度、Broker 健康状况、主题配置、消费组详情等信息。Dashboard 的存在大大提升了运维人员对 RocketMQ 集群的监控能力和管理效率。

该组件的部署可以参考DashBoard 部署指南

结构

消息生产

生产者(Producer)

Apache RocketMQ 中用于产生消息的运行实体,一般集成于业务调用链路的上游。生产者是轻量级匿名无身份的。

消息存储

  • 主题(Topic)

    Apache RocketMQ 消息传输和存储的分组容器,主题内部由多个队列组成,消息的存储和水平扩展实际是通过主题内的队列实现的。

  • 队列(MessageQueue)

    Apache RocketMQ 消息传输和存储的实际单元容器,类比于其他消息队列中的分区。 Apache RocketMQ 通过流式特性的无限队列结构来存储消息,消息在队列内具备顺序性存储特征。

  • 消息(Message)

    Apache RocketMQ 的最小传输单元。消息具备不可变性,在初始化发送和完成存储后即不可变。

消息消费

  • 消费者分组(ConsumerGroup)

    Apache RocketMQ 发布订阅模型中定义的独立的消费身份分组,用于统一管理底层运行的多个消费者(Consumer)。同一个消费组的多个消费者必须保持消费逻辑和配置一致,共同分担该消费组订阅的消息,实现消费能力的水平扩展。

  • 消费者(Consumer)

    Apache RocketMQ 消费消息的运行实体,一般集成在业务调用链路的下游。消费者必须被指定到某一个消费组中。

  • 订阅关系(Subscription)

    Apache RocketMQ 发布订阅模型中消息过滤、重试、消费进度的规则配置。订阅关系以消费组粒度进行管理,消费组通过定义订阅关系控制指定消费组下的消费者如何实现消息过滤、消费重试及消费进度恢复等。

    Apache RocketMQ 的订阅关系除过滤表达式之外都是持久化的,即服务端重启或请求断开,订阅关系依然保留。

一对多的订阅机制,订阅机制维护了消息过滤、重试、消费进度的规则配置

原理

  • consumerQueue里面保存的是消息指针,所有的消息会按顺序保存在这一条队列中,这也是RocketMQ保障消息顺序的关键

image-20250316222232156

640

消费者

image-20241128190951532

Push

Pull

短轮询

消费端拉取信息

image-20241128191039192

长轮询

消费端拉取信息

image-20241128191227071

事务消息

可以让我们确保发送的消息一定能写进MQ里,绝不会丢失掉

事务消息提供了:回滚、提交机制

原理

事务消息使用示例:

  1. 事务成功

f8017efe552e36ec636ecf126d3fa732

  1. 事务失败

f614c6180816f2e200d3fb643ecd7390

假如half消息发送成功了,生产者没收到响应怎么办?

这个half你可以理解为一个试探消息

MQ事务消息流程就是上面的,但是我们来进行比较严谨的分析,如果我们把half消息发送给MQ了,MQ给保存下来了,但是MQ返回给我们的响应生产者没收到,会怎么样?

这时候,生产者会误以为发送到MQ的half消息失败了,就不会执行后边的流程。

但MQ已经保存下来了一条half消息,这个消息怎么处理?

其实RocketMQ这里有一个补偿机制,他会去扫描自己处于half状态的消息,如果我们一直没有对这个消息执行commit或rollback操作,超过了一定的时间,他就会回调你的订单系统的一个接口,看看你这个消息什么情况,你生产者到底是打算commit这个消息,还是打算rollback这个消息?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class TransactionalMessageProducer {

public static void main(String[] args) throws Exception {
// 创建事务消息生产者
TransactionMQProducer producer = new TransactionMQProducer("TransactionProducerGroup");
producer.setNamesrvAddr("localhost:9876");

// 设置事务监听器
producer.setTransactionListener(new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 执行本地事务
try {
// 模拟本地事务执行
boolean isSuccess = executeLocalBusiness();
if (isSuccess) {
// 本地事务成功,提交消息
return LocalTransactionState.COMMIT_MESSAGE;
} else {
// 本地事务失败,回滚消息
return LocalTransactionState.ROLLBACK_MESSAGE;
}
} catch (Exception e) {
// 本地事务执行异常,返回未知状态,等待回查
return LocalTransactionState.UNKNOW;
}
}

@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 处理 Half 消息未处理超时的情况
System.out.println("Half 消息未处理超时,执行回查逻辑...");

// 检查本地事务状态
boolean isLocalTransactionSuccess = checkLocalTransactionStatus(msg);
if (isLocalTransactionSuccess) {
// 本地事务已成功,提交消息
return LocalTransactionState.COMMIT_MESSAGE;
} else {
// 本地事务失败,回滚消息
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
});

// 启动生产者
producer.start();

// 创建消息
Message msg = new Message("TransactionTopic", "TagA", "Hello RocketMQ Transaction".getBytes());

// 发送事务消息
producer.sendMessageInTransaction(msg, null);

// 关闭生产者
producer.shutdown();
}

private static boolean executeLocalBusiness() {
// 模拟本地事务执行逻辑
// 这里可以根据业务需求执行数据库操作或其他业务逻辑
// 返回 true 表示本地事务成功,false 表示失败
return false; // 模拟本地事务失败
}

private static boolean checkLocalTransactionStatus(MessageExt msg) {
// 模拟检查本地事务状态的逻辑
// 这里可以查询数据库或调用其他服务,确认本地事务是否成功
// 返回 true 表示本地事务成功,false 表示失败
return true; // 模拟本地事务已成功
}
}

顺序消息

https://chat.deepseek.com/a/chat/s/89b0e6f7-11a4-48cf-addd-1ce64e14f607

使用场景

  1. 订单状态变更
    • 订单的状态变更(如创建、支付、发货、完成)必须按照严格的顺序处理,否则会导致业务逻辑错误。
  2. 日志处理
    • 日志消息需要按照时间顺序处理,以确保日志的完整性和正确性。
  3. 库存扣减
    • 多个库存操作(如扣减、回滚)需要按照顺序执行,避免库存数据不一致。
  4. 消息队列中的依赖关系
    • 某些消息之间存在依赖关系,必须按照特定顺序处理。

顺序消息的实现原理

RocketMQ 通过以下机制保证顺序消息的顺序性:

  1. 分区有序
    • 消息发送时,通过 MessageQueueSelector 将同一业务标识(如订单 ID)的消息发送到同一个队列(MessageQueue)。
    • 消费者从同一个队列中顺序消费消息。
  2. 全局有序
    • 将所有消息发送到同一个队列,严格保证全局顺序。
    • 但这种方式会降低并发性能,通常不推荐使用。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;

import java.util.List;

public class OrderedProducer {

public static void main(String[] args) throws Exception {
// 创建生产者实例
DefaultMQProducer producer = new DefaultMQProducer("OrderedProducerGroup");
producer.setNamesrvAddr("localhost:9876");
producer.start();

// 模拟订单状态变更消息
String[] orderStatus = {"创建订单", "支付订单", "发货订单", "完成订单"};
for (int orderId = 1; orderId <= 10; orderId++) {
for (int statusIndex = 0; statusIndex < orderStatus.length; statusIndex++) {
// 创建消息
String messageBody = "订单ID: " + orderId + ", 状态: " + orderStatus[statusIndex];
Message msg = new Message("OrderTopic", "TagA", messageBody.getBytes());

// 发送顺序消息
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
// 根据订单ID选择队列,确保同一订单的消息发送到同一个队列
int orderId = (int) arg;
int index = orderId % mqs.size();
return mqs.get(index);
}
}, orderId); // 将订单ID作为参数传递

System.out.println("发送消息: " + messageBody + ", 发送结果: " + sendResult);
}
}

// 关闭生产者
producer.shutdown();
}
}

与RabbitMQ的区别

  • RocketMQ 更适合 大规模分布式系统,强调高吞吐、顺序消息和事务一致性。
  • RabbitMQ 更适合 企业级应用集成,强调灵活路由和低延迟。

架构设计对比

1. RocketMQ

  • 核心架构
    RocketMQ 采用 分布式架构,包含四个核心组件:
    • NameServer:轻量级元数据管理(Topic、Broker路由信息),无状态且支持集群。
    • Broker:消息存储与转发节点,支持主从复制和分片(Sharding)。
    • Producer:消息生产者,支持同步/异步发送、事务消息。
    • Consumer:消息消费者,支持集群消费(负载均衡)和广播消费。
  • 消息模型
    • 基于 发布-订阅模型(Topic),支持消息的 顺序消费延迟消息
    • 通过 CommitLog 统一存储消息,利用 ConsumeQueueIndexFile 实现高效查询。
  • 数据持久化
    消息默认持久化到磁盘,支持同步/异步刷盘策略,适合高吞吐、高可靠场景。

2. RabbitMQ

  • 核心架构
    RabbitMQ 基于 AMQP 协议,核心组件包括:
    • Exchange:消息路由组件,根据规则(Direct、Topic、Fanout、Headers)分发消息到队列。
    • Queue:消息存储队列,支持持久化和内存存储。
    • Producer:消息生产者,通过 Exchange 路由消息。
    • Consumer:消息消费者,通过订阅队列消费消息。
  • 消息模型
    • 基于 队列模型,支持灵活的路由规则(如 Topic 匹配、Header 过滤)。
    • 强调 消息确认机制(ACK/NACK)和 事务消息
  • 数据持久化
    消息可配置为持久化到磁盘,但默认存储在内存中,适合低延迟场景。

核心特性对比

特性 RocketMQ RabbitMQ
消息顺序 支持严格顺序消息(通过队列分片) 仅单队列内保证顺序
吞吐量 高吞吐(10万+/秒),适合大数据场景 中等吞吐(万级/秒)
延迟消息 支持固定精度延迟消息(18个级别) 通过插件支持任意延迟
事务消息 原生支持分布式事务(两阶段提交) 通过插件或事务确认机制实现
消息回溯 支持按时间点回溯消费 不支持
协议支持 自定义协议(轻量高效) AMQP 协议(标准化,多语言支持好)
扩展性 水平扩展(Broker集群分片) 垂直扩展(集群镜像队列)

与Kafaka的区别

Kafaka的生产者和消费者的消息选用了同一份存储结构

image-20250316223145390

image-20250316222941845