当单体MySQL的写入吞吐量达到每秒几万次时,垂直扩展的成本和物理极限开始显现。我们的监控面板上,IOPS和CPU使用率曲线像心电图一样剧烈波动,每一次业务高峰都像是一次对SRE团队的压力测试。手动分片?我们尝试过。那是一段充满mysqldump
、凌晨维护窗口和祈祷数据一致性的黑暗时期。每一次扩容都需要数周的规划和数小时的停机,这在快速迭代的业务面前是不可接受的。我们需要一个自动化的、对业务透明的动态分片解决方案。
我们的目标是构建一个控制平面,它能在Kubernetes环境中自主管理数据库分片(我们称之为Shard)的生命周期,特别是实现自动化的分片分裂(Shard Split)以应对数据增长,整个过程对应用层的影响降到最低。
架构决策与权衡
我们的核心是设计一个能够协调复杂分布式状态的控制平面。技术选型围绕着一致性、可靠性和性能展开。
元数据存储: ZooKeeper
我们选择ZooKeeper作为分片元数据的唯一真实来源(Single Source of Truth)。元数据包括分片拓扑(哪个数据范围映射到哪个物理DB实例)、节点状态、以及在执行分裂等关键操作时所需的分布式锁。为什么不是etcd?虽然etcd与Kubernetes生态结合更紧密,但我们团队对ZK的运维经验更丰富,且其强一致性保证和Watcher机制完全满足我们的需求。在真实项目中,这种基于团队经验的决策往往比单纯的技术优劣更重要。运行时环境: Kubernetes (容器编排)
数据库实例本身(例如Percona Server for MySQL)将作为StatefulSet部署在Kubernetes中。这为我们提供了稳定的网络标识符和持久化存储卷,是管理有状态服务的基础。控制平面本身作为一个Deployment运行,通过Kubernetes API来动态地创建、销毁数据库Pod。路由信息高速缓存: Redis
应用或数据库中间件需要高频读取分片路由表。直接请求ZooKeeper会对其造成巨大压力,且其延迟对于在线交易类请求是无法接受的。因此,我们引入Redis作为路由表的写穿(Write-through)缓存。控制平面在更新ZooKeeper中的元数据后,会立即将最新的路由信息发布到Redis。客户端优先从Redis读取,失败或未命中时才回源到ZooKeeper,这是一种常见的保障性能与一致性的折衷方案。关键状态变更通知: AWS SNS
分片分裂是一个影响深远的操作。完成后,不仅内部监控系统需要知晓,下游的数据仓库ETL、BI报表系统也可能需要触发相应的任务。使用AWS SNS(Simple Notification Service)可以将状态变更事件以发布/订阅模式解耦出去。控制平面完成一次分裂后,只需向一个SNS Topic发送一条消息,所有关心此事件的下游服务都可以订阅该Topic并作出响应,而无需与控制平面直接耦合。
控制平面的核心实现
控制平面的核心是一个持续运行的调谐循环(Reconciliation Loop),它不断地将系统的实际状态调整为我们期望的状态。我们将使用Go语言来实现。
1. ZooKeeper中的元数据结构设计
良好的元数据结构是系统的基石。我们设计的ZNode路径如下:
/db-sharding-meta
├── /config
│ ├── max_shard_size_gb = 128
│ └── split_threshold_percent = 80
├── /shards
│ ├── /shard-0000000001
│ │ ├── range_start = 0
│ │ ├── range_end = 10000000
│ │ ├── state = "ACTIVE"
│ │ └── db_node = "mysql-shard-0.mysql-headless.default.svc.cluster.local"
│ ├── /shard-0000000002
│ │ └── ...
│ └── ...
├── /nodes
│ ├── /mysql-shard-0
│ │ ├── state = "HEALTHY"
│ │ └── size_gb = 110
│ └── ...
└── /locks
└── /resync_lock (EPHEMERAL)
-
/config
: 存储全局配置,如单个分片的最大容量、触发分裂的容量阈值。 -
/shards
: 包含所有分片的映射信息。每个分片是一个持久节点,其子节点包含数据范围、状态(如ACTIVE
,SPLITTING
,INACTIVE
)和对应的物理数据库地址。 -
/nodes
: 存储每个物理数据库Pod的运行时状态,由独立的探针程序定期更新。 -
/locks
: 用于分布式锁,确保任何时候只有一个控制平面实例在执行全局性的协调任务。
2. 控制器的主循环
控制器启动后,会定期(例如每分钟)执行一次检查。
package main
import (
"context"
"log"
"time"
"github.com/go-zookeeper/zk"
// ... other imports for kubernetes, redis, aws clients
)
const (
zkRootPath = "/db-sharding-meta"
resyncLockPath = zkRootPath + "/locks/resync_lock"
)
type Controller struct {
zkConn *zk.Conn
kubeClient kubernetes.Interface
redisClient *redis.Client
snsClient *sns.Client
clusterID string // Unique ID for this controller instance
}
func (c *Controller) Run(ctx context.Context) {
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
log.Println("Controller stopping...")
return
case <-ticker.C:
log.Println("Starting reconciliation loop...")
c.reconcile()
}
}
}
func (c *Controller) reconcile() {
// 1. 尝试获取分布式锁,防止多个控制器实例同时操作
lock := zk.NewLock(c.zkConn, resyncLockPath, zk.WorldACL(zk.PermAll))
err := lock.Lock()
if err == zk.ErrNoNode {
// 如果锁节点不存在,可能需要先创建
// 在生产环境中需要更复杂的锁获取逻辑
log.Println("Lock node does not exist, skipping this cycle.")
return
}
if err == zk.ErrDeadlock {
log.Println("Another controller instance holds the lock, skipping.")
return
}
if err != nil {
log.Printf("Error acquiring lock: %v", err)
return
}
defer lock.Unlock() // 确保循环结束时释放锁
log.Println("Lock acquired. Starting cluster state check.")
// 2. 从ZK获取所有物理节点的状态
nodes, err := c.getDBNodeStates()
if err != nil {
log.Printf("Failed to get node states from ZooKeeper: %v", err)
return
}
// 3. 检查是否有需要分裂的分片
for _, node := range nodes {
// 这里的配置是从ZK的/config路径读取的
maxSize := 128.0
splitThreshold := 0.80
if float64(node.SizeGB) > maxSize*splitThreshold {
log.Printf("Shard %s on node %s requires splitting. Size: %dGB", node.ShardID, node.ID, node.SizeGB)
// 核心分裂逻辑
err := c.initiateShardSplit(node.ShardID)
if err != nil {
log.Printf("Failed to initiate split for shard %s: %v", node.ShardID, err)
// 在真实项目中,这里需要有重试和告警机制
}
// 一次循环只处理一个分裂,避免雪崩效应
return
}
}
log.Println("Reconciliation loop finished. No action required.")
}
// ... getDBNodeStates 和 initiateShardSplit 的实现 ...
这段代码展示了控制器的核心骨架:定时触发、获取分布式锁、检查状态、触发操作。这里的坑在于,分布式锁的管理必须非常健壮,要处理好锁的创建、超时和持有者崩溃等异常情况。
3. 分片分裂的编排(Orchestration)
这是整个系统中最复杂的部分,一个典型的分裂流程包含多个步骤,必须保证原子性或最终一致性。
sequenceDiagram participant C as Controller participant ZK as ZooKeeper participant K8s as Kubernetes API participant DM as Data Migration Tool participant Redis participant SNS C->>ZK: 1. 在/shards/shard-X下设置 state="SPLITTING" ZK-->>C: OK C->>K8s: 2. 创建新的StatefulSet (e.g., mysql-shard-N) K8s-->>C: Pod is Provisioning C->>DM: 3. 启动数据迁移任务 (从旧Shard到新Shard) Note right of DM: 使用类似gh-ost或自研工具
进行在线数据迁移 DM-->>C: Migration In Progress... DM-->>C: Migration Complete, Lag is near zero C->>ZK: 4. 原子地更新路由表 (事务操作) Note right of ZK: - 更新旧Shard的range_end
- 创建新Shard的元数据
- 将旧Shard state改回"ACTIVE"
- 将新Shard state设为"ACTIVE" ZK-->>C: Transaction OK C->>Redis: 5. 清理旧缓存并发布新路由信息 Redis-->>C: OK C->>SNS: 6. 发布分片分裂成功事件 SNS-->>C: OK
分裂实现的关键代码片段:
// initiateShardSplit 触发分裂流程
func (c *Controller) initiateShardSplit(sourceShardID string) error {
// 0. 前置检查,确保该分片当前状态是ACTIVE
sourceShardMeta, version, err := c.getShardMeta(sourceShardID)
if err != nil || sourceShardMeta.State != "ACTIVE" {
return fmt.Errorf("shard %s is not in ACTIVE state or not found", sourceShardID)
}
// 1. 更新状态为SPLITTING,防止并发操作
sourceShardMeta.State = "SPLITTING"
_, err = c.zkConn.Set(zkShardPath(sourceShardID), mustMarshal(sourceShardMeta), version)
if err != nil {
return fmt.Errorf("failed to set shard %s state to SPLITTING: %v", sourceShardID, err)
}
// 为了简洁,这里省略了错误处理和回滚逻辑,实际生产代码中至关重要
defer func() {
// 无论成功与否,最终都应尝试将状态恢复
// ... recovery logic ...
}()
// 2. 规划新分片并创建K8s资源
newShardID, newDBNodeName, err := c.provisionNewShardResources()
if err != nil {
return err
}
// 3. 执行数据迁移
// 这是一个阻塞操作,可能需要数小时
// 在生产系统中,这应该是一个异步任务,控制器只负责触发和轮询状态
err = c.runDataMigration(sourceShardMeta.DBNode, newDBNodeName)
if err != nil {
return err
}
// 4. 原子更新ZK元数据
err = c.commitTopologyChange(sourceShardID, newShardID, sourceShardMeta)
if err != nil {
// 这是最关键的失败点,如果这里失败,可能导致数据不一致
// 需要有强大的回滚或者手动干预预案
return err
}
// 5. 更新Redis缓存
// ... c.updateRedisCache(...) ...
// 6. 发送SNS通知
err = c.publishSplitNotification(sourceShardID, newShardID)
if err != nil {
// 通知失败不应阻塞主流程,但必须记录日志并告警
log.Printf("WARN: Failed to publish SNS notification for shard split: %v", err)
}
log.Printf("Successfully split shard %s into %s and %s", sourceShardID, sourceShardID, newShardID)
return nil
}
// commitTopologyChange 使用ZooKeeper的事务来保证原子性
func (c *Controller) commitTopologyChange(oldShardID, newShardID string, oldShardMeta *ShardMeta) error {
// 计算分裂点,例如中间位置
splitPoint := oldShardMeta.RangeStart + (oldShardMeta.RangeEnd-oldShardMeta.RangeStart)/2
// 准备新旧分片的元数据
updatedOldShardMeta := ShardMeta{
RangeStart: oldShardMeta.RangeStart,
RangeEnd: splitPoint,
State: "ACTIVE",
DBNode: oldShardMeta.DBNode,
}
newShardMeta := ShardMeta{
RangeStart: splitPoint + 1,
RangeEnd: oldShardMeta.RangeEnd,
State: "ACTIVE",
DBNode: "new-db-node-address.svc.cluster.local", // 从provisioning步骤获取
}
// 构建ZK事务
ops := []interface{}{
&zk.SetDataRequest{Path: zkShardPath(oldShardID), Data: mustMarshal(updatedOldShardMeta), Version: -1}, // -1表示不关心版本
&zk.CreateRequest{Path: zkShardPath(newShardID), Data: mustMarshal(newShardMeta), Acl: zk.WorldACL(zk.PermAll), Flags: 0},
}
// 执行事务
_, err := c.zkConn.Multi(ops...)
return err
}
func (c *Controller) publishSplitNotification(oldShardID, newShardID string) error {
message := map[string]interface{}{
"eventType": "SHARD_SPLIT_COMPLETED",
"timestamp": time.Now().UTC().Format(time.RFC3339),
"details": map[string]string{
"sourceShard": oldShardID,
"newShard": newShardID,
},
}
msgBody, _ := json.Marshal(message)
input := &sns.PublishInput{
Message: aws.String(string(msgBody)),
TopicArn: aws.String("arn:aws:sns:us-east-1:123456789012:db-sharding-events"),
}
_, err := c.snsClient.Publish(input)
if err != nil {
log.Printf("Error publishing to SNS: %v", err)
}
return err
}
一个常见的错误是,在数据迁移完成后,先更新了Redis缓存,再更新ZooKeeper。如果ZK更新失败,缓存中就会存在脏数据,导致查询请求被路由到尚未准备好的新分片上。必须先原子地更新真实数据源(ZK),成功后再去更新或失效缓存(Redis)。
5. 单元测试思路
对这样一个复杂的系统,单元测试至关重要。我们不会去模拟完整的K8s或数据库,而是针对关键逻辑。
- 控制器逻辑测试: 使用一个内存中的ZooKeeper测试服务器(如
testing-zookeeper
库),模拟不同的集群状态(如某个分片超容),断言reconcile
函数是否正确地调用了initiateShardSplit
。 - 元数据事务测试: 专门测试
commitTopologyChange
函数,确保它生成的ZooKeeper事务操作是正确的,并且能处理各种边缘情况(如分裂点计算)。 - 状态机测试: 对分片的状态(
ACTIVE
,SPLITTING
等)转换进行测试,确保不会出现非法状态转换。
局限性与未来展望
我们构建的这个控制平面解决了自动化分片分裂的核心痛下,但在生产环境中,它还远非完美。
当前的方案存在几个明显的局限性:
- 分片合并(Shard Merging)的缺失:系统只能分裂,不能合并。对于数据量波动大的业务,可能会因为历史峰值产生大量低负载的“微小”分片,造成资源浪费。实现安全的在线分片合并,其复杂性远高于分裂。
- 热点问题:分裂策略仅基于数据总量,无法解决热点数据问题。一个拥有少量数据但访问极其频繁的分片,同样会成为瓶颈。未来的迭代需要引入基于QPS或CPU负载的动态调度和分裂策略。
- 数据迁移工具的依赖:整个流程的效率和可靠性高度依赖于底层的数据迁移工具。任何迁移过程中的抖动或失败,都需要控制平面具备复杂的重试和回滚机制,而这部分的实现工作量巨大。
- 跨可用区容灾:当前设计主要关注单集群内的扩展性。要实现跨AZ甚至跨Region的高可用,需要在元数据层面、数据库复制和流量路由上进行更复杂的设计。
下一步的演进方向可能是探索使用TiDB、CockroachDB这类原生支持弹性扩展的NewSQL数据库,从根本上将分片逻辑下沉到数据库内核中。但这需要巨大的数据迁移成本和技术栈转型。因此,在可预见的未来,持续迭代和加固我们这套自建的控制平面,使其更智能、更稳健,仍将是我们工作的重点。