Skip to content
鼓励作者:欢迎打赏犒劳

MQ

RocketMQ

重要概念

  • Topic(主题)
  • Queue(队列):一个Topic由多个Queue组成,消息被均匀地分布在这些Queue中。队列是消息存储和传输的实际载体。
  • Consumer Group(消费者组)
  • Consumer Instance(消费者实例)

一个Queue只能被同一个Consumer Group下的一个Consumer Instance消费。但一个Consumer Instance可以同时消费多个Queue。

队列是什么?和Topic有什么关系?

text
队列(Queue) 是Topic的分区(Partition) 单位。当你创建一个Topic时,必须指定它包含多少个Queue(例如8个、16个)。

消息不是直接存在Topic里的,而是存储在Topic下的各个Queue中。生产者发送消息到某个Topic,这条消息会根据一定的策略(如轮询、哈希)被路由到该Topic下的某一个Queue里。

关系:Topic是逻辑概念,Queue是物理概念。Topic是逻辑上的消息分类,而Queue是实际承载消息的实体。可以把Topic理解成一个由多个Queue组成的逻辑队列。

在创建Topic的时候要指定队列么?

text
是的,必须指定。 这是在创建Topic时最重要的参数之一(通常在管理控制台或通过命令创建时指定)。
例如,创建命令通常是这样的:

sh mqadmin updateTopic -n 127.0.0.1:9876 -c DefaultCluster -t YOUR_TOPIC_NAME -r 8 -w 8

这里的 -r 8 -w 8 通常就表示读写队列数各为8个。

RocketMQ消费模式有几种?

消费模型由Consumer决定,消费维度为Topic。

  1. 集群消费
  • 一条消息只会被同Group中的一个Consumer消费
  • 多个Group同时消费一个Topic时,每个Group都会有一个Consumer消费到数据
  1. 广播消费

消息将对一 个Consumer Group 下的各个 Consumer 实例都消费一遍。即即使这些 Consumer 属于同一个Consumer Group ,消息也会被 Consumer Group 中的每个 Consumer 都消费一次。

RocketMQ消费消息是push还是pull?

RocketMQ没有真正意义的push,都是pull,虽然有push类,但实际底层实现采用的是长轮询机制,即拉取方式

broker端属性 longPollingEnable 标记是否开启长轮询。默认开启

一个队列只能被一个同Group的消费者实例消费如何理解?

这意味着对于同一个消费者组(Consumer Group)来说,它的所有实例会共同协商(通过RocketMQ的负载均衡机制)来分配某个Topic下的所有Queue。

  • 场景一:消费者实例数 = 队列数(理想状态)

    • Topic OrderTopic 有 4 个 Queue (Q1, Q2, Q3, Q4)。
    • 你的消费者组 OrderGroup 启动了 4 个实例 (C1, C2, C3, C4)。
    • 负载均衡后,分配结果是:
      • C1 -> Q1
      • C2 -> Q2
      • C3 -> Q3
      • C4 -> Q4
    • 每个消费者实例都“认领”了一个队列,分工明确,效率最高。
  • 场景二:消费者实例数 < 队列数(很常见)

    • OrderTopic 还是有 4 个 Queue (Q1, Q2, Q3, Q4)。
    • OrderGroup 只启动了 2 个实例 (C1, C2)。
    • 负载均衡后,分配结果可能是:
      • C1 -> Q1, Q2
      • C2 -> Q3, Q4
    • 每个消费者实例需要处理两个队列的消息,只要它们的处理能力跟得上,也没问题。
  • 场景三:消费者实例数 > 队列数(资源浪费!)

    • OrderTopic 只有 4 个 Queue (Q1, Q2, Q3, Q4)。
    • OrderGroup 却启动了 6 个实例 (C1, C2, C3, C4, C5, C6)。
    • 负载均衡后,分配结果只能是:
      • C1 -> Q1
      • C2 -> Q2
      • C3 -> Q3
      • C4 -> Q4
      • C5 -> (无队列可分配,处于空闲状态)
      • C6 -> (无队列可分配,处于空闲状态)
    • C5和C6无法消费任何消息,白白浪费了服务器资源。

RocketMQ如何保证消息不丢失?

  1. 生产者同步发送消息,失败重试,再失败告警+落库
  2. broke默认是异步刷盘+集群部署
  3. 消费完成ack确认

rocketMQ的消息堆积如何处理?

  1. 修改topic队列数,同时增加消费者数量
  2. 优化代码,提高业务处理速度
  3. 将topic落库或者写脚本转移到一个新的topic中。

消息消费失败后会怎么样

RocketMQ 在消息消费失败时,确实会将消息投送到重试队列。不过,在实际工作中,不需要主动同时监听原始 Topic 和对应的重试队列 Topic,因为 RocketMQ 的消费者客户端会自动处理重试队列的订阅。

当消息消费失败时,RocketMQ 会自动将消息重新投递到重试队列,。重试队列的命名格式通常是 %RETRY%+消费者组名(例如 %RETRY%my_consumer_group),而不是直接与原始 Topic 名称关联

RocketMQ 的重试机制采用时间衰减策略,即重试间隔会随着重试次数的增加而不断延长。默认最大重试次数为 16 次。

如果超过最大重试次数消息仍未消费成功,该消息会被投递到死信队列(Dead-Letter Queue, DLQ),其命名格式为 %DLQ%+消费者组名

是否需要监听两个 Topic?

不需要,RocketMQ 的消费者客户端在设计上已经考虑到了重试逻辑。当你使用 DefaultMQPushConsumer 并订阅了原始 Topic(例如 Topic A)时,RocketMQ 客户端会自动为你订阅并监听对应的重试队列(%RETRY%<YourConsumerGroupName>

延迟消息

RocketMQ通过内置的延迟级别和定时调度机制来实现延迟消息,而不是用死信队列。 它提供了18个固定的延迟等级,比如等级14对应10分钟。我们下单支付的场景,设置等级14即可。

RocketMQ不支持任意时间的延迟,而是预设了18个固定的延迟等级(Level),每个等级对应一个固定的延迟时间。

延迟等级 (Level)123456789101112131415161718
延迟时间1s5s10s30s1m2m3m4m5m6m7m8m9m10m20m30m1h2h

示例代码(Java):

java
Message msg = new Message("Order_Topic", "订单ID001", "订单超期的逻辑".getBytes());
// 设置延迟等级为14,即10分钟
msg.setDelayTimeLevel(14);
producer.send(msg);

核心流程

  • “生产者发送消息时指定一个DelayLevel。”
  • “Broker收到后会把消息转移到对应Level的内部主题(SCHEDULE_TOPIC_XXX)暂存。”
  • “Broker内部的ScheduleService会定时扫描这些内部主题,一旦发现消息到期,就将其恢复原主题和队列,重新投递。”
  • “最终,消费者从原始主题消费到这条延迟消息,完成超时检查逻辑。”

如何自定义时间呢?mq只给了预设的18个时间级别,那么我们只能通过业务来实现了,可以将过期消息计算好携带到消息里面, 如果发现消息还没到期,就重新计算时间投递就行了。

什么情况下会导致消息重试

  1. 消费者已经消费,但是服务器(Broker)没有及时收到确认(ACK),导致Broker认为消息消费失败,从而将其重新投递或放回队列
  2. 消费者超时未响应(并发消费默认15分钟),Broker会重新投递该消息

如有转载或 CV 的请标注本站原文地址