根据公司项目的一个使用场景,针对实时预警任务,会有同时消费某个设备正常或异常的事件,而这类事件具有时间维度上的先后性,需要符合 先进先出(FIFO)原则进行发布和消费,RocketMQ顺序消费脱颖而出,而MQ消费为了吞吐量基本会使用多线程消费,RocketMQ 也会分为多个消息队列,会造成消费无序的情况,这时候为了综合兼顾顺序消费+性能的考虑RocketMQ 5.x
在顺序消费继续优化,比如新增了 消息组(MessageGroup)等概念,顺序消费流程更加易于理解。
生产顺序性
Apache RocketMQ 5.x
生产顺序性由以下几点保证:
同一消息组(MessageGroup)
生产者发送消息时可以为每条消息设置消息组,只有同一消息组内的消息可以保证顺序性,不同消息组或未设置消息组的消息之间不保证顺序。
单一生产者
消息生产的顺序性仅支持单一生产者,不同生产者分布在不同的系统,即使设置相同的消息组,不同生产者之间产生的消息也无法判定其先后顺序。
串行发送
云消息队列 RocketMQ 版生产者客户端支持多线程安全访问,但如果生产者使用多线程并行发送,则不同线程间产生的消息将无法判定其先后顺序。
满足以上条件的生产者,将顺序消息发送至云消息队列 RocketMQ 版后,会保证设置了同一消息组的消息,按照发送顺序存储在同一队列中。服务端顺序存储逻辑如下:
如上图所示消息组1
和 消息组4
的消息混合存储在 队列1
中, Apache RocketMQ
保证 消息组1
中的消息 G1-M1G1-M2G1-M3
是按发送顺序存储,且 消息组4
的消息 G4-M1
G4-M2
也是按顺序存储,但 消息组1
和 消息组4
中的消息不涉及顺序关系。
消费顺序性
在顺序消息中,消息的顺序性指的是同一消息组内的多个消息之间的先后顺序。因此,顺序消息场景下,消息粒度负载均衡策略还需要保证同一消息组内的消息,按照服务端存储的先后顺序进行消费。不同消费者处理同一个消息组内的消息时,会严格按照先后顺序锁定消息状态,确保同一消息组的消息串行消费。
如上图所述,队列Queue1中有4条顺序消息,这4条消息属于同一消息组G1,存储顺序由M1到M4。在消费过程中,前面的消息M1、M2被消费者Consumer A1处理时,只要消费状态没有提交,消费者A2是无法并行消费后续的M3、M4消息的,必须等前面的消息提交消费状态后才能消费后面的消息。
Apache RocketMQ
通过下面两个条件保障消息消费严格按照存储的先后顺序来处理。
消费的有序性
在保证生产顺序性的前提下,业务方消费消息时需要严格按照 接收---处理---应答 的语义处理消息,避免因异步处理导致消息乱序。
消费者必须注册
MessageListenerOrderly
监听器,而不是MessageListenerConcurrently
。MessageListenerOrderly
的核心在于它对 每个MessageQueue
的消费加锁,并保证在队列级别串行处理。工作流程:
Consumer
实例拉取到分配给它的MessageQueue
上的消息。对于 每个
MessageQueue
,RocketMQ
客户端内部会维护一个锁(或类似机制)。同一时间,一个
MessageQueue
只允许一个消费线程进行处理(即使消费者配置了多个消费线程consumeThreadMin/consumeThreadMax
)。消费线程在处理一个队列的消息时会先获取该队列的锁。消费线程调用用户实现的
consumeMessage
方法处理一批消息(List<MessageExt>
)。这批消息来自 同一个队列且是连续的。用户业务逻辑执行完毕后,消费线程返回消费状态
ConsumeOrderlyStatus
(SUCCESS
或SUSPEND_CURRENT_QUEUE_A_MOMENT
)。只有当前批次的消息成功消费(返回
SUCCESS
)后,才会提交消费位点 (commit offset
),并释放该队列的锁。 然后才能拉取和处理该队列的下一个批次消息。如果消费失败(返回
SUSPEND_CURRENT_QUEUE_A_MOMENT
或抛出异常),RocketMQ 会在稍后(默认间隔 1 秒)重试消费当前批次的消息,并且不会提交位点,也不会释放队列锁。这意味着该队列的处理会被阻塞,直到当前批次成功消费。重试次数达到最大值(可配置)后会进入死信队列。
有限重试
如果投递消费过程中出现错误,消息重试会影响到消费有序性和消费效率Apache RocketMQ
顺序消息消费失败进行消费重试时,为保障消息的顺序性,后续消息不可被消费,必须等待前面的消息消费完成后才能被处理。顺序消息投递仅在重试次数限定范围内,即一条消息如果一直重试失败,超过最大重试次数后将不再重试,跳过这条消息消费,不会一直阻塞后续消息处理。
对于需要严格保证消费顺序的场景,请务设置合理的重试次数。
使用示例
环境准备
以windows docker环境为背景,搭建一个5.x版本。
准备docker compose脚本
version: '3.8'
services:
namesrv:
image: apache/rocketmq:5.3.0
container_name: rmqnamesrv
ports:
- 10908:9876
networks:
- rocketmq
command: sh mqnamesrv
broker:
image: apache/rocketmq:5.3.0
container_name: rmqbroker
ports:
- 10909:10909
- 10911:10911
- 10912:10912
volumes:
- C:\Users\wayne\docker\RocketMQ\data:/home/rocketmq/store
environment:
- NAMESRV_ADDR=rmqnamesrv:9876
depends_on:
- namesrv
networks:
- rocketmq
command: sh mqbroker
proxy:
image: apache/rocketmq:5.3.0
container_name: rmqproxy
networks:
- rocketmq
depends_on:
- broker
- namesrv
ports:
- 8080:8080
- 8081:8081
restart: on-failure
environment:
- NAMESRV_ADDR=rmqnamesrv:9876
command: sh mqproxy
dashboard:
image: apacherocketmq/rocketmq-dashboard:latest
container_name: rocketmq-dashboard
networks:
- rocketmq
depends_on:
- broker
- namesrv
ports:
- 10907:8080
restart: unless-stopped
environment:
- JAVA_OPTS=-Drocketmq.namesrv.addr=rmqnamesrv:9876
networks:
rocketmq:
driver: bridge
启动项目
docker-compose -f XXXXX\XXXXX\rocketmq-5.3.0-docker-compose.yml -p rockermq_project up -d
进入broker容器,执行一些初始化操作
$ docker exec -it rmqbroker bash
创建FIFO主题
顺序消息仅支持使MessageTypeFIFO
的主题,即顺序消息只能发送至类型为顺序消息的主题中,发送的消息的类型必须和主题的类型一致。
// -c 集群名称
// -t Topic名称
// -n Nameserver地址
// -o 创建顺序消息主题
sh mqadmin updateTopic -c DefaultCluster -t FIFOTopic -o true -a +message.type=FIFO
创建两个订阅消费组
// -c 集群名称
// -g ConsumerGroup名称
// -n Nameserver地址
// -o 创建顺序订阅消费组
sh mqadmin updateSubGroup -c DefaultCluster -g FIFOGroup -o true
sh mqadmin updateSubGroup -c DefaultCluster -g FIFOGroup01 -o true
发送消息
和普通消息发送相比,顺序消息发送必须要设置 消息组MessageGroup(不是消费组) 。消息组的粒度建议按照业务场景,尽可能细粒度设计,以便实现业务拆分和并发扩展。
@Test
public void sendMessage() throws ClientException {
// 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8080;xxx:8081
String endpoint = "123.123.8.2:8080";
// 消息发送的目标Topic名称,需要提前创建。
String topic = "FIFOTopic";
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);
ClientConfiguration configuration = builder.build();
// 初始化Producer时需要设置通信配置以及预绑定的Topic。
Producer producer = provider.newProducerBuilder()
.setTopics(topic)
.setClientConfiguration(configuration)
.build();
//顺序消息发送。
MessageBuilder messageBuilder = new MessageBuilderImpl();
Message message = messageBuilder.setTopic(topic)
//设置消息索引键,可根据关键字精确查找某条消息。
.setKeys("messageKey")
//设置消息Tag,用于消费端根据指定Tag过滤消息。
.setTag("messageTag")
//设置顺序消息的排序分组,该分组尽量保持离散,避免热点排序分组。
.setMessageGroup("fifoGroup001")
//消息体。
.setBody("messageBody".getBytes())
.build();
try {
//发送消息,需要关注发送结果,并捕获失败等异常
SendReceipt sendReceipt = producer.send(message);
System.out.println(sendReceipt.getMessageId());
} catch (ClientException e) {
e.printStackTrace();
}
}
接收消息
如上图所示RocketMQ
消费者处理消息时主要经过以下阶段:消息获取—>消息处理—>消费状态提交。
针对以上几个阶段RocketMQ
提供了不同的消费者类型PushConsumer
和 SimpleConsumer
。这两种类型的消费者通过不同的实现方式和接口可满足您在不同业务场景下的消费需求。具体差异如下:
PushConsumer
消费者类型PushConsumer
时,云消息队列 RocketMQ 版保证消息按照存储顺序一条一条投递给消费者。
@Test
public void receiveMessage01() throws ClientException, InterruptedException {
//消费示例:使用PushConsumer消费普通消息。
ClientServiceProvider provider = ClientServiceProvider.loadService();
String topic = "FIFOTopic";
// 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8080;xxx:8081
String endpoint = "123.123.8.2:8080";
String consumerGroup = "FIFOGroup";
FilterExpression filterExpression = new FilterExpression("*", FilterExpressionType.TAG);
provider.newPushConsumerBuilder()
//设置消费者分组。
.setConsumerGroup(consumerGroup)
//设置接入点。
.setClientConfiguration(ClientConfiguration.newBuilder().setEndpoints(endpoint).build())
//设置预绑定的订阅关系。
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
//设置消费监听器。
.setMessageListener(messageView -> {
System.out.println(messageView);
//消费消息并返回处理结果。
return ConsumeResult.SUCCESS;
}).build();
Thread.sleep(1000000);
}
SimpleConsumer
若消费者类型为SimpleConsumer,则消费者有可能一次拉取多条消息。此时,消息消费的顺序性需要由业务方自行保证。
@Test
public void receiveMessage02() throws ClientException {
//消费示例二:使用SimpleConsumer消费顺序消息,主动获取消息进行消费处理并提交消费结果。
//需要注意的是,同一个MessageGroup的消息,如果前序消息没有消费完成,再次调用Receive是获取不到后续消息的。
String topic = "FIFOTopic";
// 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8080;xxx:8081
String endpoint = "123.123.8.2:8080";
String consumerGroup = "FIFOGroup01";
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);
ClientConfiguration configuration = builder.build();
// 订阅消息的过滤规则,“*”表示订阅多有tag消息
FilterExpression filterExpression = new FilterExpression("*", FilterExpressionType.TAG);
SimpleConsumer simpleConsumer = provider.newSimpleConsumerBuilder()
.setConsumerGroup(consumerGroup)// 设置消费者分组
.setClientConfiguration(configuration)
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))// 设置预绑定的订阅关系
.setAwaitDuration(Duration.ofSeconds(30))// 设置从服务端接收消息的最大等待时间
.build();
List<MessageView> messageViewList = null;
try {
messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
messageViewList.forEach(messageView -> {
System.out.println(messageView);
//消费处理完成后,需要主动调用ACK提交消费结果。如果处理失败,如果处理失败不需要回复ACK响应,即可在定义的消费不可见时间到达后触发消费重试流程。
try {
simpleConsumer.ack(messageView);
} catch (ClientException e) {
e.printStackTrace();
}
});
} catch (ClientException e) {
//如果遇到系统流控等原因造成拉取失败,需要重新发起获取消息请求。
e.printStackTrace();
}
}
使用建议
串行消费,避免批量消费导致乱序
消息消费建议串行处理,避免一次消费多条消息,否则可能出现乱序情况。
例如:发送顺序为 1->2->3->4
,消费时批量消费,消费顺序为 1->23(批量处理,失败)->23(重试处理)->4
,此时可能由消息3
的失败导消息2
被重复处理,最后导致消息消费乱序。
消息组尽可能打散,避免集中导致热点
RocketMQ
保证相同消息组的消息存储在同一个队列中,如果不同业务场景的消息都集中在少量或一个消息组中,则这些消息存储压力都会集中到服务端的少量队列或一个队列中。容易导致性能热点,且不利于扩展。一般建议的消息组设计会采用订单ID、用户ID作为顺序参考,即同一个终端用户的消息保证顺序,不同用户的消息无需保证顺序。
因此建议将业务以消息组粒度进行拆分,例如,将订单ID、用户ID作为消息组关键字,可实现同一终端用户的消息按照顺序处理,不同用户的消息无需保证顺序。
鸣谢
修订记录
2024.10.09 - 针对5.x版本内容重新增补。
文章迁移oak