| 问题 | 关键词/关键概念 | | -------------------- | ---------------------------------------------------------------------------------------------------------------- | | RabbitMQ如何防止重复消费?| **幂等设计**:唯一ID、数据库约束<br>**消息去重**:Redis/DB记录消息ID<br>**状态机**:业务状态控制<br>**分布式锁**:处理时加锁 | | RabbitMQ如何保证高可用?| **集群模式**:普通集群、镜像队列集群<br>**镜像队列**:多节点副本、主从同步<br>**Quorum队列(3.8+)**:Raft协议、强一致性<br>**Federation**:跨数据中心 | | RabbitMQ如何保证消息不丢?| **生产端**:confirm机制、事务、备份交换机<br>**存储端**:持久化、镜像队列、集群<br>**消费端**:手动ACK、重试机制、死信队列 | | RabbitMQ如何实现消费端限流?| **QoS设置**:basicQos(prefetchCount)<br>**预取数量**:控制未确认消息数<br>**手动ACK**:处理完再确认<br>**拒绝策略**:basicReject/basicNack | | RabbitMQ怎么做消息分发?| **轮询分发**:默认Round-Robin<br>**公平分发**:根据消费能力QoS<br>**消息路由**:Direct/Topic/Fanout/Headers<br>**优先级队列**:消息优先级 | | RabbitMQ如何实现延迟消息?| **TTL+死信**:消息过期进死信队列<br>**延迟插件**:rabbitmq_delayed_message_exchange<br>**定时任务**:扫描数据库发送 | | RabbitMQ整体架构?| **核心组件**:Producer→Exchange→Queue→Consumer<br>**节点结构**:Connection→Channel→Virtual Host<br>**存储层**:Mnesia数据库、消息持久化 | | 什么是死信队列?| **触发条件**:TTL过期、队列满、消费拒绝<br>**死信交换机**:x-dead-letter-exchange<br>**应用场景**:延迟队列、重试机制、异常处理 | | RabbitMQ事务机制?| **AMQP事务**:txSelect/txCommit/txRollback<br>**性能问题**:同步阻塞、吞吐量降250倍<br>**替代方案**:Publisher Confirm更高效 | | 如何保障消息一定发送到RabbitMQ?| **Confirm机制**:异步确认<br>**Return机制**:路由失败回调<br>**备份交换机**:Alternate Exchange<br>**重试机制**:失败重发 | # RabbitMQ核心架构详解 ## **1. 整体架构** ```Java 生产者(Producer) ↓ [Connection/Channel] Exchange(交换机) ├── Direct Exchange (精确匹配) ├── Topic Exchange (模式匹配) ├── Fanout Exchange (广播) └── Headers Exchange (头部匹配) ↓ [Binding/Routing Key] Queue(队列) ├── Classic Queue (经典队列) ├── Quorum Queue (仲裁队列) └── Stream Queue (流队列) ↓ [Push/Pull] 消费者(Consumer) 管理组件: ├── Virtual Host (虚拟主机隔离) ├── Connection (TCP连接) ├── Channel (信道复用) └── Mnesia (元数据存储) ``` ## **2. 高可用方案** ### **A. 镜像队列(Classic Mirrored)** ```bash # 设置镜像策略 rabbitmqctl set_policy ha-all "^" \ '{"ha-mode":"all","ha-sync-mode":"automatic"}' # ha-mode选项: - all: 所有节点 - exactly: 指定数量 - nodes: 指定节点 ``` ### **B. Quorum队列(推荐)** ```java // 声明Quorum队列 Map<String, Object> args = new HashMap<>(); args.put("x-queue-type", "quorum"); args.put("x-quorum-initial-group-size", 3); // 初始副本数 channel.queueDeclare("quorum-queue", true, false, false, args); // 优势: // - Raft协议保证一致性 // - 自动故障转移 // - 无需同步等待 ``` ### **C. 集群模式对比** |模式|优点|缺点|适用场景| |---|---|---|---| |**普通集群**|简单、扩展读能力|队列单点、数据不同步|开发测试| |**镜像队列**|数据冗余、自动故障转移|性能损耗、网络压力大|传统高可用| |**Quorum队列**|强一致、性能好|3.8+版本、功能受限|生产推荐| |**Federation**|跨数据中心、松耦合|最终一致、配置复杂|多数据中心| ## **3. 消息不丢失完整方案** ```java // 生产端确认 public class ReliableProducer { // 1. 开启Confirm模式 channel.confirmSelect(); // 2. 添加确认监听器 channel.addConfirmListener(new ConfirmListener() { @Override public void handleAck(long deliveryTag, boolean multiple) { // 消息成功到达Exchange } @Override public void handleNack(long deliveryTag, boolean multiple) { // 消息未到达,需要重发 retrySend(deliveryTag); } }); // 3. 开启Return机制 channel.addReturnListener(new ReturnListener() { @Override public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) { // 消息未路由到队列,处理失败 handleRoutingFailure(body); } }); // 4. 发送持久化消息 AMQP.BasicProperties props = new AMQP.BasicProperties.Builder() .deliveryMode(2) // 持久化 .messageId(UUID.randomUUID().toString()) .build(); channel.basicPublish(exchange, routingKey, true, props, message); } // 消费端确认 public class ReliableConsumer { // 手动ACK模式 channel.basicConsume(queueName, false, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) { try { // 业务处理 processMessage(body); // 手动确认 channel.basicAck(envelope.getDeliveryTag(), false); } catch (Exception e) { // 处理失败,重新入队 channel.basicNack(envelope.getDeliveryTag(), false, true); } } }); } ``` ## **4. 消费端限流实现** ```java // QoS设置 channel.basicQos( 0, // prefetchSize: 消息大小限制,0表示不限制 10, // prefetchCount: 未确认消息数量 false // global: false表示Consumer级别,true表示Channel级别 ); // 实际效果: // - 每个消费者最多处理10条未确认消息 // - 超过限制后,RabbitMQ停止推送 // - 确认后继续推送 ``` ## **5. 防止重复消费** ```java @Component public class IdempotentConsumer { @Autowired private RedisTemplate<String, String> redisTemplate; @RabbitListener(queues = "order.queue") public void handleMessage(Message message) { String messageId = message.getMessageProperties().getMessageId(); // 1. 分布式锁防并发 String lockKey = "lock:" + messageId; Boolean locked = redisTemplate.opsForValue() .setIfAbsent(lockKey, "1", 10, TimeUnit.SECONDS); if (!locked) { return; // 其他实例正在处理 } try { // 2. 幂等性检查 String processKey = "processed:" + messageId; Boolean isNew = redisTemplate.opsForValue() .setIfAbsent(processKey, "1", 24, TimeUnit.HOURS); if (!isNew) { return; // 已处理过 } // 3. 业务处理 processOrder(message); } finally { redisTemplate.delete(lockKey); } } } ``` ## **6. 消息分发策略** ```java // 1. 轮询分发(默认) channel.basicConsume(queueName, autoAck, consumer); // 2. 公平分发 channel.basicQos(1); // 每次只发1条 channel.basicConsume(queueName, false, consumer); // 手动ACK // 3. 基于优先级 Map<String, Object> args = new HashMap<>(); args.put("x-max-priority", 10); // 最大优先级 channel.queueDeclare("priority-queue", true, false, false, args); // 发送带优先级的消息 AMQP.BasicProperties props = new AMQP.BasicProperties.Builder() .priority(5) // 消息优先级 .build(); ``` ## **7. 延迟消息实现** ### **方案1: TTL + 死信队列** ```java // 1. 声明死信交换机和队列 channel.exchangeDeclare("dlx.exchange", "direct"); channel.queueDeclare("dlx.queue", true, false, false, null); // 2. 声明延迟队列(带TTL和死信配置) Map<String, Object> args = new HashMap<>(); args.put("x-message-ttl", 10000); // 10秒过期 args.put("x-dead-letter-exchange", "dlx.exchange"); args.put("x-dead-letter-routing-key", "dlx.routing.key"); channel.queueDeclare("delay.queue", true, false, false, args); // 3. 发送延迟消息 channel.basicPublish("", "delay.queue", props, message); ``` ### **方案2: 延迟插件** ```java // 安装插件:rabbitmq-plugins enable rabbitmq_delayed_message_exchange // 声明延迟交换机 Map<String, Object> args = new HashMap<>(); args.put("x-delayed-type", "direct"); channel.exchangeDeclare("delayed.exchange", "x-delayed-message", true, false, args); // 发送延迟消息 AMQP.BasicProperties props = new AMQP.BasicProperties.Builder() .headers(Collections.singletonMap("x-delay", 5000)) // 延迟5秒 .build(); channel.basicPublish("delayed.exchange", routingKey, props, message); ``` ## **8. 死信队列(DLX)详解** ```java // 死信触发条件: // 1. 消息TTL过期 // 2. 队列达到最大长度 // 3. 消费者拒绝(requeue=false) // 配置示例 Map<String, Object> args = new HashMap<>(); args.put("x-dead-letter-exchange", "dlx.exchange"); args.put("x-dead-letter-routing-key", "dlx.key"); args.put("x-message-ttl", 60000); // 60秒过期 args.put("x-max-length", 1000); // 最大1000条 channel.queueDeclare("business.queue", true, false, false, args); ``` ## **9. 事务 vs Confirm对比** |特性|事务机制|Confirm机制| |---|---|---| |**性能**|极差(降250倍)|好(异步)| |**同步性**|同步阻塞|异步回调| |**批量处理**|不支持|支持批量确认| |**使用复杂度**|简单|稍复杂| |**推荐度**|不推荐|生产推荐| ## **10. 最佳实践** ```yaml # 生产端最佳实践 1. 开启Publisher Confirm 2. 开启Publisher Return 3. 消息持久化 4. 设置消息ID去重 5. 实现重试机制 # 消费端最佳实践 1. 手动ACK确认 2. 设置QoS限流 3. 幂等性设计 4. 异常重试策略 5. 死信队列兜底 # 运维最佳实践 1. 使用Quorum队列 2. 监控队列积压 3. 设置内存/磁盘告警 4. 定期备份数据 5. 合理的TTL策略 ```