wayne
wayne
发布于 2025-04-23 / 7 阅读
0
0

RocketMQ - 从消息可靠传输谈高可用

Apache RocketMQ 作为阿里巴巴开源的一款分布式消息中间件,凭借其高吞吐、低延迟、高可用等特性,成为金融级稳定性场景的首选解决方案。本文将深入剖析 RocketMQ 的架构设计,解读其核心组件、存储机制和高可用策略。

RocketMQ 核心架构

RocketMQ 采用经典的 发布-订阅模型,核心组件包括:

  • NameServer:轻量级注册中心,负责路由管理。

  • Broker:消息存储与转发核心节点。

  • Producer:消息生产者,支持同步/异步/单向发送。

  • Consumer:消息消费者,支持集群和广播消费模式。

核心组件详解

主从架构

  • Broker 集群:每个 Broker 分为 MasterSlave 角色,Master 负责读写,Slave 作为热备。

    • 同步复制(SYNC_MASTER):消息写入 Master 后,需等待 Slave 同步完成才返回成功,保证数据强一致(牺牲部分性能)。

    • 异步复制(ASYNC_MASTER):消息写入 Master 后立即返回成功,Slave 异步复制,性能更高但可能丢失少量数据。

  • NameServer 节点间的路由信息可能存在短暂不一致(如某个 NameServer 尚未收到 Broker 的最新心跳),但通过以下机制保证最终一致:

    • Broker 向所有 NameServer 周期性上报心跳,存活节点最终会同步最新状态。

    • 客户端定时拉取路由表,覆盖旧缓存。

  • 故障自动切换

    • 基于 Dledger(Raft 协议实现):在 RocketMQ 4.5+ 版本中,Dledger 实现自动选主,当 Master 宕机时,Slave 自动晋升为新 Master,避免单点故障。

    • 旧版本通过 HA(High Availability) 服务手动切换。

  • 客户端缓存路由信息:Producer 和 Consumer 会缓存 Topic 的路由信息,即使 NameServer 短暂不可用,仍能继续收发消息。

NameServer:无状态路由中枢

  • 去中心化设计:多个 NameServer 节点独立运行,无主从依赖。

  • 职责

    • 管理 Broker 地址列表与 Topic 路由信息。

    • 提供心跳检测机制,实时感知 Broker 存活状态。

  • 数据更新:Broker 每30秒上报元数据,NameServer 维护最终一致性。

Broker:消息存储引擎

  • 主从架构:Master 处理读写请求,Slave 提供数据备份和读负载均衡。

  • 核心模块

    • Remoting Module:处理客户端请求(生产/消费消息)。

    • Store Module:高效管理消息存储(CommitLog + 索引)。

    • HA Service:主从数据同步,保障高可用。

Producer & Consumer

  • Producer

    • 通过 Topic 发布消息,支持消息压缩、事务消息。

    • 自动选择 MessageQueue 实现负载均衡。

  • Consumer

    • 支持 Push/Pull 两种消费模式。

    • 集群模式下通过 Offset 管理消费进度。

消息存储机制:高性能的基石

存储结构设计

  • CommitLog:所有消息顺序写入日志文件,消除随机IO瓶颈。

  • ConsumeQueue:逻辑队列,记录消息在 CommitLog 的物理偏移。

  • IndexFile:基于 Key/时间戳的消息索引,支持快速查询。

CommitLog
├── 00000000000000000000
├── 00000000000000000001
└── ...

ConsumeQueue
└── TopicA
    ├── 0 (QueueID)
    │   ├── 00000000000000000000
    │   └── ...
    └── 1
        └── ...

刷盘机制

  • 同步刷盘(FLUSH_SYNC):消息写入 PageCache 后立即刷盘,数据零丢失。

  • 异步刷盘(FLUSH_ASYNC):依赖 OS 定期刷盘,吞吐量更高。

零拷贝技术

  • MappedFile:通过内存映射文件(MMAP)提升大文件读写效率。

  • Sendfile:消费时直接通过 DMA 传输数据,减少 CPU 拷贝开销。

高可用与负载均衡设计

Broker 主从同步

  • 同步复制(SYNC_MASTER):消息写入 Slave 成功后才返回 ACK。

  • 异步复制(ASYNC_MASTER):Master 写入后立即响应,异步同步到 Slave。

故障自动切换

  • DLedger 模式:基于 Raft 协议实现多副本强一致性,支持自动选主。

    • Dledger集群的选举是通过Raft协议进⾏的,Raft协议是⼀种多数同意机制。也就是每次选举需要有集群中超过

      半数的节点确认,才能形成整个集群的共同决定。同时,这也意味着在Dledger集群中,只要有超过半数的节点能

      够正常⼯作,那么整个集群就能正常⼯作。因此,在部署Dledger集群时,通常都是部署奇数台服务,这样可以让

      集群的容错性达到最⼤。

  • Consumer 重平衡:Broker 宕机时,Consumer 自动切换到其他可用节点。

负载均衡策略

  • Producer 侧:轮询/随机/Hash 算法分配 MessageQueue。

  • Consumer 侧

    • 平均分配(AllocateMessageQueueAveragely)

    • 一致性 Hash(AllocateMessageQueueConsistentHash)

消息可靠传输

从消息可靠传输可以看到很多RocketMQ高可用的设计。

生产端

使用Dledger集群方案

DLedger是一套基于Raft协议的分布式日志存储组件,在使用Dledger技术搭建的RocketMQ集群中,Dledger会通过两阶段提交的方式保证文件在主从之间成功同步。

配置同步刷盘

SYNC\_FLUSH(同步刷新)相比于ASYNC\_FLUSH(异步处理)会损失很多性能,但是也更可靠,所以需要根据实际的业务场景做好权衡。

使用事务消息

基于Apache RocketMQ实现的分布式事务消息功能,在普通消息基础上,支持二阶段的提交能力。将二阶段提交和本地事务绑定,实现全局提交结果的一致性。

事务消息原理

两阶段提交流程
  1. 半消息(Half Message)

    • 生产者发送消息到 Broker,消息标记为 TRAN_MSG=true,暂存到内部 Topic(RMQ_SYS_TRANS_HALF_TOPIC),对消费者不可见

    • Broker 持久化成功后返回 Ack 确认。

  2. 执行本地事务

    • 生产者收到 Ack 后执行本地事务(如更新数据库),并根据结果向 Broker 发送二次确认:

      • COMMIT:半消息转移到业务 Topic,对消费者可见。

      • ROLLBACK:删除半消息。

      • UNKNOW:触发事务回查。

  3. 事务状态回查

    • 若 Broker 未收到二次确认(如网络闪断),会定时(默认间隔 60 秒)向生产者发起回查请求(CHECK_TRANSACTION_STATE110

    • 生产者通过 checkLocalTransaction 方法检查本地事务状态,重新提交 Commit/Rollback46

    • 回查次数限制:默认最多 15 次(由 transactionCheckMax 控制),超时则强制回滚510

关键设计
  • 去重与一致性:通过事务 ID(transactionId)关联半消息与本地事务,避免重复处理4

  • 存储优化:半消息单独存储,Commit 后转移到业务 Topic,通过 OP 队列(RMQ_SYS_TRANS_OP_HALF_TOPIC)记录操作日志7


💻 二、使用示例(Java)

实现事务监听器
public class TransactionListenerImpl implements TransactionListener {
    private final ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

    // 第一阶段:执行本地事务
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        String orderId = (String) arg;
        try {
            // 1. 执行本地事务(如更新订单状态)
            boolean success = updateOrderStatus(orderId, "PAID");
            // 2. 根据结果返回状态
            return success ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.ROLLBACK_MESSAGE;
        } catch (Exception e) {
            // 3. 异常返回 UNKNOW,触发回查
            return LocalTransactionState.UNKNOW;
        }
    }

    // 第二阶段:事务回查
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        String orderId = msg.getKeys();
        // 查询数据库确认事务状态
        String status = queryOrderStatus(orderId);
        if ("PAID".equals(status)) {
            return LocalTransactionState.COMMIT_MESSAGE;
        } else if ("CANCELED".equals(status)) {
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
        return LocalTransactionState.UNKNOW; // 继续等待回查
    }
}
发送事务消息
public class TransactionProducer {
    public static void main(String[] args) throws Exception {
        // 1. 创建事务生产者
        TransactionMQProducer producer = new TransactionMQProducer("tx_producer_group");
        producer.setNamesrvAddr("localhost:9876");
        
        // 2. 设置事务监听器及线程池
        producer.setTransactionListener(new TransactionListenerImpl());
        producer.setExecutorService(Executors.newFixedThreadPool(10));
        producer.start();

        // 3. 构造消息(以订单ID为例)
        Message msg = new Message("ORDER_TOPIC", "PAY", 
                "ORDER_1001", "Pay success".getBytes());
        msg.putUserProperty("orderId", "ORDER_1001"); // 设置业务Key
        
        // 4. 发送事务消息(传递业务参数orderId)
        SendResult result = producer.sendMessageInTransaction(msg, "ORDER_1001");
        System.out.println("Send status: " + result.getSendStatus());
        
        producer.shutdown();
    }
}
消费者(与普通消息一致)
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tx_consumer_group");
consumer.subscribe("ORDER_TOPIC", "*");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
    for (MessageExt msg : msgs) {
        System.out.println("Consume: " + new String(msg.getBody()));
    }
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();

高级配置与注意事项

1. 参数调优

参数

默认值

说明

transactionCheckInterval

60 秒

Broker 回查间隔

transactionCheckMax

15 次

最大回查次数,超时强制回滚

transactionTimeout

6 秒

半消息超时时间,可消息级覆盖

消息属性 CHECK_IMMUNITY_TIME

-

设置单条消息免回查时间(优先级高于全局)

示例:设置消息免回查时间为 10 秒

msg.putUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS, "10");
2. 使用限制
  • 禁止场景

    • 不支持延迟消息、批量消息。

    • 生产者组不可与其他类型消息共用。

  • 消费端保证

    • 事务消息仅确保生产者端一致性,消费端需自行处理幂等(如通过订单ID去重)

💎 四、典型场景案例

电商支付流程

  1. 用户支付 → 2. 订单系统发送半消息 → 3. 更新订单状态为“已支付” → 4. Commit 消息 → 5. 下游系统(物流、积分)消费消息。
    若步骤 3 失败,则回滚消息,避免下游误发货29

观察回执状态

对于消费可靠传输,先关注下生产端的回执状态,RocketMQ 的同步发送(syncSend)和异步发送(asyncSend)均会返回以下状态(通过 SendStatus 枚举类定义):

SEND_OK

* 含义:消息发送成功,且满足 Broker 的持久化策略。

* 触发条件

* 消息已成功写入 Broker 的 CommitLog。

* 若 Broker 配置为 SYNC_MASTER(同步复制),需主节点和从节点均刷盘完成。

* 若配置为 ASYNC_MASTER(异步复制),只需主节点刷盘完成。

* 处理建议

* 业务可认为消息已可靠存储,无需重试。

* 需注意SEND_OK 仅表示 Broker 接收成功,不保证消费者一定能消费到(例如网络故障导致未推送)。

FLUSH_DISK_TIMEOUT

* 含义:Broker 刷盘超时。

* 触发条件

* Broker 的刷盘策略为 SYNC_FLUSH(同步刷盘)时,等待刷盘超时(默认5秒)。

* 磁盘 I/O 压力大或故障导致无法及时刷盘。

* 处理建议

* 消息可能未持久化到磁盘,存在丢失风险。

* 需重试发送,并监控 Broker 磁盘状态。

FLUSH_SLAVE_TIMEOUT

* 含义:主节点同步到从节点(Slave)超时。

* 触发条件

* Broker 角色为 SYNC_MASTER(同步复制)。

* 主节点等待 Slave 同步确认超时(默认5秒)。

* 处理建议

* 消息在主节点已持久化,但未同步到 Slave。

* 需重试发送,避免主节点宕机后数据丢失。

SLAVE_NOT_AVAILABLE

* 含义:从节点(Slave)不可用。

* 触发条件

* Broker 角色为 SYNC_MASTER,但当前无可用 Slave。

* Slave 节点宕机或网络隔离。

* 处理建议

* 主节点已接收消息,但无法保证高可用。

* 需检查 Slave 状态,必要时修复集群。

Keys的使用

每个消息在业务层面一般建议映射到业务的唯一标识并设置到keys字段,方便将来定位消息丢失问题。服务器会为每个消息创建索引(哈希索引),应用可以通过topic、key来查询这条消息内容,以及消息被谁消费。由于是哈希索引,请务必保证key尽可能唯一,这样可以避免潜在的哈希冲突。常见的设置策略使用订单Id、用户Id、请求Id等比较离散的唯一标识来处理。

消费端

不要使用异步消费

正常情况下,消费者端都是需要先处理本地事务,然后再给MQ一个ACK响应,这时MQ就会修改Offset,将消息标记为已消费,从而不再往其他消费者推送消息。所以在Broker的这种重新推送机制下,消息是不会在传输过程中丢失的。但是也会有下面这种情况会造成服务端消息丢失:

 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");

        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override

            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,

                                                            ConsumeConcurrentlyContext context) {

                new Thread(){

                    public void run(){

                        //处理业务逻辑

                        System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);

                    }

                };

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

            }

        });

这种异步消费的方式,就有可能造成消息状态返回后消费者本地业务逻辑处理失败造成消息丢失的可能。

RocketMQ不可用降级方案

在这种情况下,RocketMQ相当于整个服务都不可用了,那他本身肯定无法给我们保证消息不丢失了。我们只能自己设计一个降级方案来处理这个问题了。例如在订单系统中,如果多次尝试发送RocketMQ不成功,那就只能另外找给地方,比如本地事务表,把订单消息缓存下来,然后起一个线程定时的扫描这些失败的订单消息,尝试往RocketMQ发送。这样等RocketMQ的服务恢复过来后,就能第一时间把这些消息重新发送出去。

RocketMQ 消息堆积处理方案

RocketMQ 消息堆积的本质是消费速度落后于生产速度,需从生产端、Broker 端、消费端协同优化。以下是系统性解决方案:


🔍 一、快速定位堆积根源

  1. 检查堆积位置

    • 控制台监控:通过 RocketMQ 控制台查看 accumulationCount 指标,确认堆积在 Broker(未投递)还是消费者本地(已拉取未处理)16

    • 日志分析:搜索消费者日志 ons.log,若出现 the cached message count exceeds the threshold,说明堆积在消费者本地缓存队列1

  2. 诊断消费瓶颈

    • 消费耗时:通过控制台查看消息轨迹,分析单条消息消费耗时(正常应 <1s)1

    • 线程堆栈:使用 jstack 抓取消费线程(ConsumeMessageThread)状态,检查是否阻塞于外部调用(如数据库、网络IO)14


⚙️ 二、分级解决方案

✅ 1. 消费端优化(最常见)

  • 增加并发度

    • 水平扩容:增加消费者实例数,确保 实例数 ≤ 队列数(MessageQueue)。若队列不足,需先扩容队列:

      mqadmin updateTopic -n nameserver:9876 -t YourTopic -r 32 -w 32  # 队列从16扩至32:cite[2]:cite[6]
    • 线程调优:调整消费线程数(IO密集型建议 2*CPU核数):

      java

      consumer.setConsumeThreadMin(20);
      consumer.setConsumeThreadMax(64);  // RocketMQ 消费者设置:cite[2]:cite[7]
  • 异步化+批量消费

    • 解耦消息拉取与业务处理,使用线程池异步处理:

      java

      consumer.registerMessageListener((msgs, context) -> {
          executorService.submit(() -> processBatch(msgs)); // 异步提交线程池
          return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
      }):cite[2]:cite[6]
    • 开启批量拉取(每次最多32条):

      java

      consumer.setConsumeMessageBatchMaxSize(32);  // 提升吞吐量:cite[2]
  • 简化消费逻辑

    • 临时跳过非关键操作(如日志记录、冗余校验)3

    • 异常捕获避免单条消息阻塞整个线程4

🚦 2. 生产端限流(防堆积恶化)

  • 控制发送速率

    java

    producer.setSendMsgRate(500); // 限流500条/秒:cite[5]:cite[7]
  • 启用批量发送:合并消息减少网络开销:

    java

    List<Message> batchMsgs = new ArrayList<>();
    for (int i=0; i<100; i++) { batchMsgs.add(new Message(topic, tag, body)); }
    producer.send(batchMsgs);  // 批量发送:cite[2]
  • 异步发送+失败重试:避免阻塞生产者线程25

💾 3. Broker 端调优(提升吞吐)

  • 存储优化

    • 使用 SSD 替换机械硬盘,调整刷盘策略为异步(牺牲部分可靠性换性能):

      properties

      flushDiskType=ASYNC_FLUSH   # broker.conf 配置:cite[2]:cite[10]
  • 线程池扩容

    properties

      sendMessageThreadPoolNums=32  # 发送线程数
      pullMessageThreadPoolNums=32  # 拉取线程数:cite[2]

🆘 4. 积压消息紧急处理

  • 跳过堆积消息(慎用!):

    bash

    mqadmin resetOffsetByTime -n nameserver:9876 -g ConsumerGroup -t Topic -s now  # 重置位点到最新:cite[1]:cite[8]
  • 分流消费(适用于队列不足):

    1. 新建临时 Topic(队列数为原10倍)

    2. 编写中转消费者:拉取积压消息 → 均匀写入新 Topic

    3. 部署10倍消费者实例消费新 Topic68

  • 死信队列(DLQ):隔离重复失败的消息,避免阻塞正常队列:

    java

    consumer.setMaxReconsumeTimes(3);  // 最大重试3次:cite[2]:cite[7]

最佳实践场景

  • 电商场景:订单超时取消(延时消息)

  • 日志采集:海量数据传输削峰填谷

  • 金融交易:跨系统事务最终一致性(事务消息)

总结

RocketMQ 通过分层架构设计,在存储效率、集群扩展性和数据可靠性之间实现完美平衡。其设计理念值得分布式系统开发者深入借鉴:

  1. 顺序写 + 零拷贝 → 极致 IO 性能

  2. 主从分离 + 多副本 → 金融级高可用

  3. 轻量级 NameServer → 避免单点瓶颈


评论