业务的连续性要求我们的核心消息链路具备地域级容灾能力。当单一云服务商的某个区域(Region)出现整体故障时,服务必须能够在另一个区域甚至另一个云服务商的基础设施上快速恢复。摆在面前的技术决策是:如何构建一个跨越不同云厂商(例如AWS和阿里云)的、统一的、高可用的消息中间件集群。
定义问题与技术选型
核心诉旗:
- RPO (Recovery Point Objective) < 1分钟: 灾难发生时,最多允许丢失1分钟内的数据。
- RTO (Recovery Time Objective) < 10分钟: 灾难发生后,消息的生产和消费必须在10分钟内恢复。
- 规避厂商锁定: 架构必须是云中立的,不能深度绑定任何一家云厂商的专有消息服务(PaaS)。
- 应用透明: 对上游生产者和下游消费者来说,主备切换过程应尽可能透明,理想情况下无需修改配置和重启应用。
方案A:组合使用云厂商托管服务
一个直接的思路是分别采用两家云厂商的托管消息队列,例如在AWS使用Amazon MQ,在阿里云使用消息队列ONS(商业版RocketMQ),然后通过应用层逻辑或自研数据同步管道进行双向数据复制。
优势:
- 运维成本极低,云厂商负责底层实例的维护、补丁和高可用。
- 开箱即用,可以快速搭建起来。
劣势:
- 数据同步是噩梦: 异构消息系统(即使底层都是RocketMQ,PaaS版本也存在API和特性差异)之间的实时同步非常困难,难以保证一致性和低延迟。自研同步方案本身就是一个复杂的分布式系统,违背了使用PaaS的初衷。
- API不统一: 生产者和消费者的SDK、认证方式、监控指标均不相同,需要开发适配层,增加了应用端的复杂性。
- 无法实现透明切换: 故障切换时,生产者和消费者几乎必然需要更改连接地址、认证信息等配置,难以满足RTO和应用透明的要求。
- 深度厂商锁定: 这恰恰是我们试图规避的核心问题。
结论:此方案在技术上可行性差,且与核心目标“规避厂商锁定”背道而驰,直接否决。
方案B:基于云主机自建跨云RocketMQ集群
该方案的核心是在两家云厂商的VPC(Virtual Private Cloud)内,使用基础的计算实例(如EC2和ECS)自行部署和管理一套统一的RocketMQ集群。利用RocketMQ 4.5+版本引入的Dledger机制,构建一个跨数据中心的、具备自动主从切换能力的存储层。
优势:
- 技术栈统一: 无论是生产者、消费者还是运维工具,面对的都是同一套标准的Apache RocketMQ,不存在API兼容性问题。
- 规避厂商锁定: 底层依赖的是最标准的计算、存储和网络资源,具备在任何云环境之间迁移的能力。
- 原生高可用: Dledger基于Raft协议,能够为Broker组提供崩溃自愈和主节点自动选举的能力,这是实现低RTO的关键。
劣势:
- 运维复杂度高: 需要自行负责集群的部署、监控、升级和故障排查。
- 跨云网络成本与延迟: 需要打通两个云厂商VPC之间的网络,这通常通过VPN网关或专线实现,会产生额外的网络费用和不可忽视的延迟。
- 对技术团队要求高: 团队需要对RocketMQ的内部原理,特别是Dledger的运行机制有深入理解。
最终决策:
尽管方案B的运维成本更高,但它完美契合了我们的核心诉求:技术统一、规避锁定、以及实现真正的业务连续性。长远来看,掌控核心中间件的能力所带来的架构灵活性和自主性,其价值远超短期内增加的运维负担。我们选择方案B。
核心实现概览
我们将构建一个跨越AWS ap-northeast-1
(东京)区域和阿里云 cn-hangzhou
(杭州)区域的RocketMQ集群。
架构设计
整体架构依赖于RocketMQ自身的组件来完成跨云协同。
graph TD subgraph "AWS (ap-northeast-1)" subgraph "VPC-AWS" ProducerA[Producer App] -->|TCP| NS1_AWS(NameServer 1) ConsumerA[Consumer App] -->|TCP| NS1_AWS ProducerA -->|TCP| B1_M_AWS(Broker 1 / Master) ConsumerA -->|TCP| B1_M_AWS NS1_AWS <--> NS2_AWS(NameServer 2) NS1_AWS -.-> B1_M_AWS NS2_AWS -.-> B1_M_AWS end end subgraph "阿里云 (cn-hangzhou)" subgraph "VPC-ALI" ProducerB[Producer App] -->|TCP| NS3_ALI(NameServer 3) ConsumerB[Consumer App] -->|TCP| NS3_ALI ProducerB -->|TCP| B1_S_ALI(Broker 1 / Slave) ConsumerB -->|TCP| B1_S_ALI NS3_ALI <--> NS4_ALI(NameServer 4) NS3_ALI -.-> B1_S_ALI NS4_ALI -.-> B1_S_ALI end end subgraph "跨云高速网络 (VPN/专线)" VPC-AWS <===> VPC-ALI end subgraph "Dledger Raft Group (for Broker-1)" B1_M_AWS -- Raft Log Replication --> B1_S_ALI B1_S_ALI -- Raft Log Replication --> B1_M_AWS end ProducerA -.->|namesrv: ns1,ns2,ns3,ns4| ProducerB ConsumerA -.->|namesrv: ns1,ns2,ns3,ns4| ConsumerB style B1_M_AWS fill:#8f8,stroke:#333,stroke-width:2px style B1_S_ALI fill:#f9f,stroke:#333,stroke-width:2px
架构关键点:
- NameServer部署: 在AWS和阿里云两地都部署NameServer集群。所有客户端(Producer/Consumer)的配置中,
namesrvAddr
列表必须包含所有地域的NameServer地址。这是实现故障自动切换的基石。 - Broker部署: 我们以一个Broker组(例如
broker-a
)为例。该组包含两个节点,一个部署在AWS(作为初始Master),另一个部署在阿里云(作为初始Slave/Follower)。 - Dledger (Raft) 组: 这两个Broker节点共同组成一个Dledger Raft组。消息的写入必须得到Raft组中多数节点(在这里是2个节点中的2个,或者3个节点中的2个,生产环境推荐3节点)的确认,从而保证数据在写入主节点后,被同步复制到了从节点。
- 跨云网络: AWS VPC和阿里云VPC之间必须建立稳定、低延迟的私网连接。实际项目中,这通常通过各自云厂商提供的VPN网关或更高速的专线/云连接服务来实现。安全组和网络ACL必须精确地开放RocketMQ所需端口(
9876
for NameServer,10911
for Broker VIP channel,10909
for Broker a-la-carte channel, 以及Dledger通信端口)。
关键配置与代码实现
这里的坑在于,官方文档对于跨公网或复杂网络环境下的Dledger配置描述不多。核心是确保每个Broker节点都能通过其对端可访问的IP地址进行注册和通信。
1. Broker节点配置
假设我们规划的IP如下:
- AWS Broker:
- 内网IP:
172.16.1.10
- 对端(阿里云)可访问的IP(VPN或专线映射后的地址):
10.10.1.10
- 内网IP:
- 阿里云 Broker:
- 内网IP:
192.168.1.20
- 对端(AWS)可访问的IP:
10.10.2.20
- 内网IP:
- Dledger通信端口:
40911
AWS Broker (broker-a
, brokerId=0
, Master) 的配置文件 broker-aws.conf
:
# 所属集群名字
brokerClusterName = CrossCloudRocketMQCluster
# broker名字,同一个Raft组的brokerName必须一致
brokerName = broker-a
# brokerId=0表示Master, >0表示Slave。但在Dledger模式下此项不起作用,Leader由Raft选举产生。
brokerId = 0
# NameServer地址列表, 包含所有节点
namesrvAddr = 172.16.0.5:9876;172.16.0.6:9876;192.168.0.5:9876;192.168.0.6:9876
# --- Dledger 核心配置 ---
# 开启Dledger作为存储引擎
enableDLegerCommitLog = true
# Dledger组名,与brokerName保持一致
dledgerGroup = broker-a
# Raft组内所有成员的地址,格式为 "n_id-peer_ip:port;..."
# 这里的IP必须是对端可以访问的IP。
# n0 是 AWS Broker, n1 是阿里云 Broker
dledgerPeers = n0-10.10.1.10:40911;n1-10.10.2.20:40911
# 声明自己是哪个节点
dledgerSelfId = n0
# broker对外服务地址,必须配置为客户端可以访问的地址
# 在云环境中,通常是内网IP
brokerIP1 = 172.16.1.10
# 消息存储路径
storePathRootDir = /data/rocketmq/store
storePathCommitLog = /data/rocketmq/store/commitlog
storePathConsumeQueue = /data/rocketmq/store/consumequeue
storePathIndex = /data/rocketmq/store/index
storePathCheckpoint = /data/rocketmq/store/checkpoint
abortFile = /data/rocketmq/store/abort
# 自动创建Topic
autoCreateTopicEnable = true
# Broker角色。Dledger模式下,推荐设置为ASYNC_MASTER,因为主备同步由Raft保证
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
阿里云 Broker (broker-a
, brokerId
不起作用) 的配置文件 broker-ali.conf
:
brokerClusterName = CrossCloudRocketMQCluster
brokerName = broker-a
brokerId = 0
namesrvAddr = 172.16.0.5:9876;172.16.0.6:9876;192.168.0.5:9876;192.168.0.6:9876
# --- Dledger 核心配置 ---
enableDLegerCommitLog = true
dledgerGroup = broker-a
# Raft组配置与AWS节点完全一致
dledgerPeers = n0-10.10.1.10:40911;n1-10.10.2.20:40911
# 声明自己是n1节点
dledgerSelfId = n1
brokerIP1 = 192.168.1.20
# 存储路径等其他配置与AWS节点类似
storePathRootDir = /data/rocketmq/store
# ... (其他路径配置)
autoCreateTopicEnable = true
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
配置要点剖析:
-
dledgerPeers
: 这是整个跨云配置的核心。这里的IP地址必须是Raft组成员之间相互通信用的地址。如果两边VPC通过NAT网关或VPN连接,这里就要填写对端路由可达的IP,而不是机器本身的私有IP。 -
dledgerSelfId
: 正确标识自己是Raft组中的哪个成员。 -
namesrvAddr
: 必须是全集。客户端通过查询任何一个NameServer,都能获取到整个集群(包括所有Broker)的路由信息。当某个Broker Master宕机,Dledger选举出新的Master后,新的Master会向所有NameServer上报心跳,更新自己的角色信息。客户端的SDK会定期从NameServer拉取最新的路由表,从而感知到Master已经切换,并将后续请求发送到新的Master。
2. 生产级生产者代码
一个常见的错误是,在生产者代码中只配置了当前区域的NameServer。这在区域故障时会导致无法连接到备用区域的NameServer,从而无法获取新的Broker路由。
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
public class CrossCloudProducer {
private static final Logger logger = LoggerFactory.getLogger(CrossCloudProducer.class);
// 关键:NameServer地址列表包含所有云的所有节点
private static final String NAMESRV_ADDR = "172.16.0.5:9876;172.16.0.6:9876;192.168.0.5:9876;192.168.0.6:9876";
private static final String PRODUCER_GROUP = "P_cross_cloud_order_group";
private static final String TOPIC = "T_cross_cloud_orders";
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);
// 设置NameServer地址
producer.setNamesrvAddr(NAMESRV_ADDR);
// 实例名,建议设置为PID@Hostname,方便排查问题
producer.setInstanceName("Producer-" + UUID.randomUUID());
// 生产环境重试配置
// 如果发送失败,同步模式下内部重试2次
producer.setRetryTimesWhenSendFailed(2);
// 发送超时时间,默认3000ms
producer.setSendMsgTimeout(5000);
try {
producer.start();
logger.info("Producer started.");
for (int i = 0; i < 100; i++) {
String orderId = "Order_" + System.currentTimeMillis();
String body = "{\"orderId\": \"" + orderId + "\", \"amount\": 100.0, \"timestamp\": " + System.currentTimeMillis() + "}";
Message msg = new Message(TOPIC, "PlaceOrder", orderId, body.getBytes(StandardCharsets.UTF_8));
try {
SendResult sendResult = producer.send(msg);
if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
logger.info("Send message successfully. MsgId: {}, Broker: {}",
sendResult.getMsgId(), sendResult.getMessageQueue().getBrokerName());
} else {
logger.warn("Send message with status: {}. Result: {}", sendResult.getSendStatus(), sendResult);
}
} catch (Exception e) {
// 记录发送失败的日志,后续可能需要人工补偿
logger.error("Failed to send message: " + body, e);
// 避免在循环中快速失败,可以加入短暂休眠
Thread.sleep(1000);
}
// 模拟业务流量
Thread.sleep(500);
}
} catch (Exception e) {
logger.error("Producer failed to start or run.", e);
} finally {
// 在应用关闭时,务必关闭producer,释放资源
producer.shutdown();
logger.info("Producer shutdown.");
}
}
}
3. 生产级消费者代码
消费者的配置同理,namesrvAddr
也必须是全集。
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
public class CrossCloudConsumer {
private static final Logger logger = LoggerFactory.getLogger(CrossCloudConsumer.class);
private static final String NAMESRV_ADDR = "172.16.0.5:9876;172.16.0.6:9876;192.168.0.5:9876;192.168.0.6:9876";
private static final String CONSUMER_GROUP = "C_cross_cloud_order_processor";
private static final String TOPIC = "T_cross_cloud_orders";
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
consumer.setNamesrvAddr(NAMESRV_ADDR);
consumer.setInstanceName("Consumer-" + UUID.randomUUID());
// 设置从哪里开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 订阅Topic,"*"表示订阅所有Tag
consumer.subscribe(TOPIC, "*");
// 注册消息监听器
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
try {
// 真实项目中,这里应有幂等性处理逻辑
MessageExt msg = msgs.get(0); // 默认批处理大小为1
String body = new String(msg.getBody(), StandardCharsets.UTF_8);
logger.info("Received message: MsgId: {}, Topic: {}, Body: {}", msg.getMsgId(), msg.getTopic(), body);
// 模拟业务处理
// processOrder(body);
// 如果业务处理成功,返回CONSUME_SUCCESS
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
// 如果处理失败,记录日志,并要求稍后重试
logger.error("Consume message failed. msgs: {}", msgs, e);
// 返回RECONSUME_LATER,消息将会在稍后被重新投递
// 注意配置重试次数,避免无限重试导致消息积压
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
// 启动消费者
consumer.start();
logger.info("Consumer Started.");
// 添加JVM关闭钩子,确保消费者正常关闭
Runtime.getRuntime().addShutdownHook(new Thread(consumer::shutdown));
}
}
架构的局限性与未来展望
这套架构并非银弹,它有明确的适用边界和需要正视的挑战。
首先,跨地域网络延迟是物理瓶颈。如果AWS东京和阿里云杭州之间的网络延迟达到50ms,那么对于要求同步发送并确认的消息,其RT(响应时间)至少会增加50ms。Dledger的Raft协议要求日志同步到多数派,这意味着每次写入的延迟都受限于主备节点间的网络RTT。因此,该架构更适用于对延迟不极端敏感,但对数据可靠性要求极高的异步化业务场景。
其次,运维成本是现实的。自建集群意味着需要投入人力进行监控(Prometheus + Grafana)、告警、容量规划、版本升级和安全加固。一套完善的自动化运维体系是这套架构能稳定运行的前提,可以考虑基于Kubernetes和RocketMQ Operator来简化部署和管理。
最后,两节点的Raft组存在脑裂风险。在生产环境中,强烈建议部署三节点Raft组(例如,两节点在主可用区,一节点在备用可用区),这样即使有一个节点或网络分区,只要多数派(2/3)仍然连通,集群就能继续提供服务,容错能力更强。
未来的迭代方向将聚焦于运维自动化和韧性增强。引入混沌工程实践,定期模拟单云厂商区域故障、跨云网络中断等场景,验证这套灾备体系的自动愈合能力是否符合预期,并不断优化故障发现、切换和恢复的流程,确保RTO和RPO在真实故障中能够得到保障。