在一个按用户ID分片的微服务体系中,一个看似简单的用户注册流程暴露了架构的裂缝。该流程要求原子性地完成两个操作:在users
分片库中创建核心用户记录,并在accounts
分片库中初始化用户的资金账户。这两个库物理隔离,传统的本地ACID事务在此场景下无能为力。这正是典型的分布式事务问题。
方案A:独立的事务协调器服务
最初的构想是引入一个独立的“事务协调器”(Transaction Coordinator)服务。应用服务(例如 user-service
)不直接执行业务逻辑,而是向这个协调器发起一个事务请求。协调器内部维护一个状态机,按预定顺序通过RPC调用各个参与方服务(user-service
的createUser
接口,account-service
的createAccount
接口)。
优势分析:
- 逻辑集中:分布式事务的复杂逻辑(重试、补偿)被封装在协调器中,业务服务相对纯粹。
- 理解直接:这种命令式的、工作流引擎式的模型符合直觉,易于初期开发团队理解。
劣势分析:
- 状态管理复杂:协调器自身必须是高可用的,其内部的事务状态持久化(例如存入Redis或独立数据库)成为了新的关键瓶颈和故障点。协调器若在事务执行中途崩溃,状态恢复将是一场灾难。
- 紧耦合与侵入性:业务服务需要适配协调器的API,甚至可能需要改变接口来支持重试和补偿的幂等性要求。整个系统的调用链路从
A -> B
变成了A -> Coordinator -> B
,增加了网络开销和延迟。 - 可观测性差:一个正在进行中的事务状态是什么?卡在了哪一步?排查问题需要深入协调器的日志或其内部状态库,缺乏统一和直观的观测手段。在一次代码审查中,我们发现追踪单个失败的注册流程需要关联三个服务的日志,效率极低。
在真实项目中,这种独立的协调器服务往往会演变成一个难以维护的“巨石”。它的稳定性直接决定了整个系统的核心业务可用性。
方案B:基于Kubernetes CRD的声明式协调器
既然我们的服务环境已经全面容器化并由Kubernetes编排,为什么不利用Kubernetes自身的能力来管理事务状态?这个思路的核心是将分布式事务本身也看作一种“资源”。
我们可以定义一个自定义资源(Custom Resource Definition, CRD),名为 SagaTransaction
。当需要执行一个分布式事务时,应用服务不再是调用另一个服务,而是使用Kubernetes API创建一个SagaTransaction
资源实例。然后,一个专门为此CRD编写的Operator(控制器)会监听到这个资源的创建,并像Kubernetes管理Pod一样,驱动这个事务“资源”达到其最终状态。
优势分析:
- 声明式API:开发者只需描述事务的“期望状态”(包含哪些步骤、对应的补偿操作是什么),而无需关心如何执行。这与Kubernetes的设计哲学完全一致。
- 利用原生高可用:Operator本身可以部署多个副本,利用Kubernetes的leader election机制实现高可用。事务的状态直接记录在
SagaTransaction
资源的status
字段中,由etcd保证持久化和一致性,我们无需自己维护一个高可用的状态存储。 - 卓越的可观测性:任何有
kubectl
权限的人都可以通过kubectl get sagatransaction <tx-id> -o yaml
清晰地看到事务的当前状态、执行到哪一步、失败原因等。这极大地简化了调试和运维。 - 解耦与非侵入性:业务服务只需拥有创建Kubernetes资源的RBAC权限即可,与协调逻辑完全解耦。
劣势分析:
- 开发门槛:编写一个健壮的Kubernetes Operator比编写一个简单的业务服务要复杂得多,需要对Kubernetes的控制器模型、调谐循环(Reconciliation Loop)和API交互有深入理解。
- etcd压力:频繁创建和更新事务状态会对Kubernetes API Server和其后的etcd造成压力。对于高频事务场景,需要评估其性能影响。
最终选择:
尽管方案B的初始开发成本更高,但从长期可维护性、系统韧性和运维便利性来看,其优势是压倒性的。我们将分布式事务的管理,从应用层面的问题,下沉到了平台基础设施层面。我们决定采用此方案,并使用Node.js生态(TypeScript)来构建这个Operator,以便团队可以利用现有的JavaScript技术栈。
核心实现概览
1. CRD定义 (sagatransaction.yaml
)
这是我们声明式API的基石。spec
部分描述了事务的意图,status
部分则由Operator填充,反映了事务的真实世界状态。
# crd/sagatransaction.core.tech/v1alpha1.yaml
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: sagatransactions.core.tech
spec:
group: core.tech
versions:
- name: v1alpha1
served: true
storage: true
schema:
openAPIV3Schema:
type: object
properties:
spec:
type: object
properties:
steps:
type: array
items:
type: object
required: ["name", "action", "compensate"]
properties:
name:
type: string
description: "A unique name for the step."
action:
type: object
description: "The forward operation to execute."
properties:
endpoint:
type: string
method:
type: string
enum: ["POST", "PUT", "DELETE"]
payload:
type: object
x-kubernetes-preserve-unknown-fields: true
compensate:
type: object
description: "The compensating operation."
properties:
endpoint:
type: string
method:
type: string
enum: ["POST", "PUT", "DELETE"]
payload:
type: object
x-kubernetes-preserve-unknown-fields: true
required: ["steps"]
status:
type: object
properties:
phase:
type: string
enum: ["Pending", "Processing", "Compensating", "Succeeded", "Failed"]
currentStep:
type: string
completedSteps:
type: array
items:
type: string
lastErrorMessage:
type: string
retries:
type: integer
scope: Namespaced
names:
plural: sagatransactions
singular: sagatransaction
kind: SagaTransaction
shortNames:
- saga
一个关键的设计是在spec.steps.action.payload
和spec.steps.compensate.payload
中使用了x-kubernetes-preserve-unknown-fields: true
,这允许我们传递任意结构的JSON对象作为服务调用的请求体。
2. Operator核心调谐逻辑 (TypeScript)
我们使用 @kubernetes/client-node
库与Kubernetes API进行交互。整个Operator的核心是一个无限循环的调谐函数(Reconciliation Loop)。
// src/reconciler.ts
import * as k8s from '@kubernetes/client-node';
import axios from 'axios';
import { V1ObjectMeta } from '@kubernetes/client-node';
// 定义CRD的类型,为了简洁这里省略了完整定义
interface SagaStep {
name: string;
action: { endpoint: string; method: 'POST' | 'PUT' | 'DELETE'; payload: object };
compensate: { endpoint: string; method: 'POST' | 'PUT' | 'DELETE'; payload: object };
}
interface SagaTransactionSpec {
steps: SagaStep[];
}
interface SagaTransactionStatus {
phase?: 'Pending' | 'Processing' | 'Compensating' | 'Succeeded' | 'Failed';
currentStep?: string;
completedSteps?: string[];
lastErrorMessage?: string;
retries?: number;
}
interface SagaTransaction extends k8s.KubernetesObject {
spec: SagaTransactionSpec;
status?: SagaTransactionStatus;
}
const CUSTOM_GROUP = 'core.tech';
const CUSTOM_VERSION = 'v1alpha1';
const CUSTOM_PLURAL = 'sagatransactions';
export class SagaReconciler {
private readonly k8sApi: k8s.KubeConfig;
private readonly customObjectsApi: k8s.CustomObjectsApi;
constructor() {
this.k8sApi = new k8s.KubeConfig();
this.k8sApi.loadFromDefault();
this.customObjectsApi = this.k8sApi.makeApiClient(k8s.CustomObjectsApi);
}
public async reconcile(namespace: string, name: string): Promise<void> {
console.log(`[Reconcile] Starting for ${namespace}/${name}`);
try {
const { body: resource } = await this.customObjectsApi.getNamespacedCustomObject(
CUSTOM_GROUP,
CUSTOM_VERSION,
namespace,
CUSTOM_PLURAL,
name
) as { body: SagaTransaction };
const currentStatus = resource.status || { phase: 'Pending' };
switch (currentStatus.phase) {
case 'Pending':
await this.handlePending(resource);
break;
case 'Processing':
await this.handleProcessing(resource);
break;
case 'Compensating':
await this.handleCompensating(resource);
break;
case 'Succeeded':
case 'Failed':
// Terminal states, do nothing.
break;
}
} catch (e: any) {
// 如果资源不存在 (404),说明已被删除,直接忽略
if (e.statusCode === 404) {
console.log(`[Reconcile] Resource ${namespace}/${name} not found, likely deleted.`);
return;
}
console.error(`[Reconcile] Error processing ${namespace}/${name}:`, e.body?.message || e.message);
}
}
private async handlePending(resource: SagaTransaction): Promise<void> {
console.log(`[Phase: Pending] Transitioning to Processing for ${resource.metadata!.name}`);
const newStatus: SagaTransactionStatus = {
phase: 'Processing',
currentStep: resource.spec.steps[0].name,
completedSteps: [],
retries: 0,
};
await this.updateStatus(resource, newStatus);
}
private async handleProcessing(resource: SagaTransaction): Promise<void> {
const status = resource.status!;
const allStepNames = resource.spec.steps.map(s => s.name);
const completedSteps = new Set(status.completedSteps || []);
const nextStep = resource.spec.steps.find(s => !completedSteps.has(s.name));
if (!nextStep) {
console.log(`[Phase: Processing] All steps completed. Transitioning to Succeeded for ${resource.metadata!.name}`);
const newStatus: SagaTransactionStatus = { ...status, phase: 'Succeeded', currentStep: '' };
await this.updateStatus(resource, newStatus);
return;
}
console.log(`[Phase: Processing] Executing step: ${nextStep.name}`);
try {
// 这里的 step executor 必须包含超时和重试逻辑
await this.executeStep(nextStep.action);
const newCompletedSteps = [...(status.completedSteps || []), nextStep.name];
const nextStepIndex = allStepNames.indexOf(nextStep.name) + 1;
const newCurrentStep = nextStepIndex < allStepNames.length ? allStepNames[nextStepIndex] : '';
const newStatus: SagaTransactionStatus = {
...status,
completedSteps: newCompletedSteps,
currentStep: newCurrentStep,
lastErrorMessage: '',
retries: 0,
};
await this.updateStatus(resource, newStatus);
} catch (error: any) {
console.error(`[Phase: Processing] Step ${nextStep.name} failed for ${resource.metadata!.name}:`, error.message);
const newStatus: SagaTransactionStatus = {
...status,
phase: 'Compensating',
lastErrorMessage: `Action failed: ${error.message}`,
};
await this.updateStatus(resource, newStatus);
}
}
private async handleCompensating(resource: SagaTransaction): Promise<void> {
const status = resource.status!;
const completedSteps = status.completedSteps || [];
if (completedSteps.length === 0) {
console.log(`[Phase: Compensating] No steps to compensate. Transitioning to Failed for ${resource.metadata!.name}`);
const newStatus: SagaTransactionStatus = { ...status, phase: 'Failed', currentStep: '' };
await this.updateStatus(resource, newStatus);
return;
}
// 从已完成的步骤中,找到最后一个进行补偿
const lastCompletedStepName = completedSteps[completedSteps.length - 1];
const stepToCompensate = resource.spec.steps.find(s => s.name === lastCompletedStepName)!;
console.log(`[Phase: Compensating] Executing compensation for step: ${stepToCompensate.name}`);
try {
// 补偿操作也必须是幂等的
await this.executeStep(stepToCompensate.compensate);
const newCompletedSteps = completedSteps.slice(0, -1);
const newStatus: SagaTransactionStatus = {
...status,
completedSteps: newCompletedSteps,
currentStep: newCompletedSteps.length > 0 ? newCompletedSteps[newCompletedSteps.length - 1] : '',
};
// 递归补偿下一个
await this.updateStatus(resource, newStatus);
} catch (error: any) {
// 这里的坑在于:如果补偿操作本身失败了怎么办?
// 真实项目中,这里需要实现带退避策略的重试。如果持续失败,
// 只能将状态置为 `CompensationFailed` 并触发告警,需要人工介入。
console.error(`[Phase: Compensating] Compensation for ${stepToCompensate.name} failed! Manual intervention may be required.`);
const retries = (status.retries || 0) + 1;
const newStatus: SagaTransactionStatus = {
...status,
lastErrorMessage: `Compensation failed: ${error.message}`,
retries: retries,
};
// 这里只是简单更新状态,生产环境需要更复杂的逻辑
await this.updateStatus(resource, newStatus);
}
}
private async executeStep(operation: SagaStep['action'] | SagaStep['compensate']): Promise<void> {
// 生产级的 executor 需要更复杂的实现
// 包含: mTLS, service discovery, request tracing, timeout, retry with backoff
await axios({
method: operation.method,
url: operation.endpoint, // e.g., http://user-service.default.svc.cluster.local/users
data: operation.payload,
timeout: 5000, // 关键:必须设置超时
});
}
private async updateStatus(resource: SagaTransaction, newStatus: SagaTransactionStatus): Promise<void> {
const patch = [
{
op: 'replace',
path: '/status',
value: newStatus,
},
];
try {
await this.customObjectsApi.patchNamespacedCustomObjectStatus(
CUSTOM_GROUP,
CUSTOM_VERSION,
resource.metadata!.namespace!,
CUSTOM_PLURAL,
resource.metadata!.name!,
patch,
undefined,
undefined,
undefined,
{ headers: { 'Content-Type': 'application/json-patch+json' } }
);
} catch (e: any) {
console.error(`[UpdateStatus] Failed to update status for ${resource.metadata!.name}:`, e.body?.message || e.message);
// 状态更新失败是一个严重问题,可能会导致调谐循环处理同一个旧状态的资源,
// 必须谨慎处理。
throw e;
}
}
}
3. 流程可视化
整个交互流程可以用时序图清晰地表示:
sequenceDiagram participant Client as User Application participant K8sApi as Kubernetes API Server participant Operator as Saga Operator participant SvcA as User Service (Shard 1) participant SvcB as Account Service (Shard 2) Client->>+K8sApi: CREATE SagaTransaction resource (spec) K8sApi-->>-Client: Ack Note right of K8sApi: Watch event triggered K8sApi->>+Operator: Notify: New SagaTransaction created Operator->>Operator: Reconcile(): state is 'Pending' Operator->>+K8sApi: UPDATE Status: phase='Processing', currentStep='createUser' K8sApi-->>-Operator: Ack Operator->>+SvcA: POST /users (action) SvcA-->>-Operator: 201 Created Operator->>Operator: Reconcile(): step 'createUser' succeeded Operator->>+K8sApi: UPDATE Status: completedSteps=['createUser'], currentStep='createAccount' K8sApi-->>-Operator: Ack Operator->>+SvcB: POST /accounts (action) SvcB-->>-Operator: 500 Internal Server Error (Failure!) Operator->>Operator: Reconcile(): step 'createAccount' failed Operator->>+K8sApi: UPDATE Status: phase='Compensating' K8sApi-->>-Operator: Ack Operator->>+SvcA: DELETE /users (compensate for 'createUser') SvcA-->>-Operator: 204 No Content Operator->>Operator: Reconcile(): compensation for 'createUser' succeeded Operator->>+K8sApi: UPDATE Status: phase='Failed', completedSteps=[] K8sApi-->>-Operator: Ack
架构的局限性与未来展望
此方案并非银弹。它本质上提供的是“最终一致性”,不适用于需要强一致性的场景(例如金融交易中的转账)。整个事务的完成时间取决于调谐循环的间隔、网络延迟和业务服务的处理时间,其实时性低于传统两阶段提交(2PC),但可用性远高于后者。
一个常见的错误是,认为Operator可以解决所有问题。补偿逻辑的幂等性、业务服务的可靠性,这些仍然是应用开发者必须关注的核心。例如,createUser
的补偿操作deleteUser
,如果deleteUser
接口被重复调用,必须保证第二次调用不会报错或产生副作用。在Code Review中,对补偿接口的幂等性测试用例的审查,其重要性不亚于对核心业务逻辑的审查。
未来的演进方向可以是在CRD中加入更复杂的流程控制,如并行步骤(parallelSteps
)、条件分支(when
clause),使其成为一个更通用的云原生工作流引擎。此外,集成服务网格(Service Mesh)可以天然地为executeStep
提供重试、超时和熔断能力,进一步将这些网络通信的韧性策略从Operator代码中剥离,下沉到基础设施层。