
MQ
RocketMQ
重要概念
- Topic(主题)
- Queue(队列):一个Topic由多个Queue组成,消息被均匀地分布在这些Queue中。队列是消息存储和传输的实际载体。
- Consumer Group(消费者组)
- Consumer Instance(消费者实例)
一个Queue只能被同一个Consumer Group下的一个Consumer Instance消费。但一个Consumer Instance可以同时消费多个Queue。
队列是什么?和Topic有什么关系?
text
队列(Queue) 是Topic的分区(Partition) 单位。当你创建一个Topic时,必须指定它包含多少个Queue(例如8个、16个)。
消息不是直接存在Topic里的,而是存储在Topic下的各个Queue中。生产者发送消息到某个Topic,这条消息会根据一定的策略(如轮询、哈希)被路由到该Topic下的某一个Queue里。
关系:Topic是逻辑概念,Queue是物理概念。Topic是逻辑上的消息分类,而Queue是实际承载消息的实体。可以把Topic理解成一个由多个Queue组成的逻辑队列。
在创建Topic的时候要指定队列么?
text
是的,必须指定。 这是在创建Topic时最重要的参数之一(通常在管理控制台或通过命令创建时指定)。
例如,创建命令通常是这样的:
sh mqadmin updateTopic -n 127.0.0.1:9876 -c DefaultCluster -t YOUR_TOPIC_NAME -r 8 -w 8
这里的 -r 8 -w 8 通常就表示读写队列数各为8个。
RocketMQ消费模式有几种?
消费模型由Consumer决定,消费维度为Topic。
- 集群消费
- 一条消息只会被同Group中的一个Consumer消费
- 多个Group同时消费一个Topic时,每个Group都会有一个Consumer消费到数据
- 广播消费
消息将对一 个Consumer Group 下的各个 Consumer 实例都消费一遍。即即使这些 Consumer 属于同一个Consumer Group ,消息也会被 Consumer Group 中的每个 Consumer 都消费一次。
RocketMQ消费消息是push还是pull?
RocketMQ没有真正意义的push,都是pull,虽然有push类,但实际底层实现采用的是长轮询机制,即拉取方式
broker端属性 longPollingEnable 标记是否开启长轮询。默认开启
一个队列只能被一个同Group的消费者实例消费如何理解?
这意味着对于同一个消费者组(Consumer Group)来说,它的所有实例会共同协商(通过RocketMQ的负载均衡机制)来分配某个Topic下的所有Queue。
场景一:消费者实例数 = 队列数(理想状态)
- Topic
OrderTopic
有 4 个 Queue (Q1, Q2, Q3, Q4)。 - 你的消费者组
OrderGroup
启动了 4 个实例 (C1, C2, C3, C4)。 - 负载均衡后,分配结果是:
- C1 -> Q1
- C2 -> Q2
- C3 -> Q3
- C4 -> Q4
- 每个消费者实例都“认领”了一个队列,分工明确,效率最高。
- Topic
场景二:消费者实例数 < 队列数(很常见)
OrderTopic
还是有 4 个 Queue (Q1, Q2, Q3, Q4)。OrderGroup
只启动了 2 个实例 (C1, C2)。- 负载均衡后,分配结果可能是:
- C1 -> Q1, Q2
- C2 -> Q3, Q4
- 每个消费者实例需要处理两个队列的消息,只要它们的处理能力跟得上,也没问题。
场景三:消费者实例数 > 队列数(资源浪费!)
OrderTopic
只有 4 个 Queue (Q1, Q2, Q3, Q4)。OrderGroup
却启动了 6 个实例 (C1, C2, C3, C4, C5, C6)。- 负载均衡后,分配结果只能是:
- C1 -> Q1
- C2 -> Q2
- C3 -> Q3
- C4 -> Q4
- C5 -> (无队列可分配,处于空闲状态)
- C6 -> (无队列可分配,处于空闲状态)
- C5和C6无法消费任何消息,白白浪费了服务器资源。
RocketMQ如何保证消息不丢失?
- 生产者同步发送消息,失败重试,再失败告警+落库
- broke默认是异步刷盘+集群部署
- 消费完成ack确认
rocketMQ的消息堆积如何处理?
- 修改topic队列数,同时增加消费者数量
- 优化代码,提高业务处理速度
- 将topic落库或者写脚本转移到一个新的topic中。
消息消费失败后会怎么样
RocketMQ 在消息消费失败时,确实会将消息投送到重试队列。不过,在实际工作中,不需要主动同时监听原始 Topic 和对应的重试队列 Topic,因为 RocketMQ 的消费者客户端会自动处理重试队列的订阅。
当消息消费失败时,RocketMQ 会自动将消息重新投递到重试队列,。重试队列的命名格式通常是 %RETRY%+消费者组名(例如 %RETRY%my_consumer_group
),而不是直接与原始 Topic 名称关联
RocketMQ 的重试机制采用时间衰减策略,即重试间隔会随着重试次数的增加而不断延长。默认最大重试次数为 16 次。
如果超过最大重试次数消息仍未消费成功,该消息会被投递到死信队列(Dead-Letter Queue, DLQ),其命名格式为 %DLQ%+消费者组名
是否需要监听两个 Topic?
不需要,RocketMQ 的消费者客户端在设计上已经考虑到了重试逻辑。当你使用 DefaultMQPushConsumer
并订阅了原始 Topic(例如 Topic A)时,RocketMQ 客户端会自动为你订阅并监听对应的重试队列(%RETRY%<YourConsumerGroupName>
)
延迟消息
RocketMQ通过内置的延迟级别和定时调度机制来实现延迟消息,而不是用死信队列。 它提供了18个固定的延迟等级,比如等级14对应10分钟。我们下单支付的场景,设置等级14即可。
RocketMQ不支持任意时间的延迟,而是预设了18个固定的延迟等级(Level),每个等级对应一个固定的延迟时间。
延迟等级 (Level) | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10 | 11 | 12 | 13 | 14 | 15 | 16 | 17 | 18 |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
延迟时间 | 1s | 5s | 10s | 30s | 1m | 2m | 3m | 4m | 5m | 6m | 7m | 8m | 9m | 10m | 20m | 30m | 1h | 2h |
示例代码(Java):
java
Message msg = new Message("Order_Topic", "订单ID001", "订单超期的逻辑".getBytes());
// 设置延迟等级为14,即10分钟
msg.setDelayTimeLevel(14);
producer.send(msg);
核心流程:
- “生产者发送消息时指定一个DelayLevel。”
- “Broker收到后会把消息转移到对应Level的内部主题(SCHEDULE_TOPIC_XXX)暂存。”
- “Broker内部的ScheduleService会定时扫描这些内部主题,一旦发现消息到期,就将其恢复原主题和队列,重新投递。”
- “最终,消费者从原始主题消费到这条延迟消息,完成超时检查逻辑。”
如何自定义时间呢?mq只给了预设的18个时间级别,那么我们只能通过业务来实现了,可以将过期消息计算好携带到消息里面, 如果发现消息还没到期,就重新计算时间投递就行了。
什么情况下会导致消息重试
- 消费者已经消费,但是服务器(Broker)没有及时收到确认(ACK),导致Broker认为消息消费失败,从而将其重新投递或放回队列
- 消费者超时未响应(并发消费默认15分钟),Broker会重新投递该消息