我们的 GitOps 流程开始出现一个棘手的问题。在一个庞大的 Monorepo 中,多个 CI/CD 流水线可能被几乎同时触发,它们都会尝试对同一组配置文件(例如共享的 package.json
或 tsconfig.json
)执行自动化代码格式化。这直接导致了频繁的 Git 冲突和流水线失败。起初的解决方案是在 CI Runner 内部加一个本地文件锁,但这在 Vercel Functions 这种分布式、无状态的 Serverless 环境下毫无意义,每一次函数调用都是一个独立的实例。我们需要一个跨实例、跨调用的全局锁。
初步构想是利用数据库行锁,但为这样一个高频、短时效的锁机制引入一个重型数据库,并在 Serverless 函数中管理其连接池,显得过于笨重且成本高昂。我们需要的是一个轻量级、专为分布式协调而生的工具。这自然而然地指向了 etcd
。它的租约(Lease)和锁(Lock)机制似乎是为这个场景量身定做的。
最终的技术方案定型如下:
- 执行环境:
Vercel Functions
作为 Serverless API 端点,接收格式化请求。 - 核心逻辑:
Prettier
作为代码格式化引擎,在函数内部执行。 - 分布式锁:
etcd
提供全局唯一的锁,确保对同一个资源(文件)的格式化操作是串行的。租约机制可以保证即使函数异常退出,锁也能自动释放,避免死锁。 - 性能优化: 在
etcd
之前增加一层Memcached
缓存。对于一个已经被锁定的资源,后续大量的请求可以直接被 Memcached 快速拒绝,避免对etcd
集群造成不必要的压力。 - 异步通知: 格式化任务完成后,通过
AWS SNS
发送一个通知。这使得核心服务与后续的流程(如更新 GitHub Check、发送 Slack 通知)完全解耦。
下面是这个系统的架构流程图。
sequenceDiagram participant Client as CI/CD Pipeline participant Vercel as Vercel Function API participant Cache as Memcached participant Coordinator as etcd participant Notifier as AWS SNS Client->>+Vercel: POST /api/format { resourceId: 'file/A' } Vercel->>+Cache: GET lock:file/A Cache-->>-Vercel: Cache Miss Vercel->>+Coordinator: Acquire Lock on 'file/A' with Lease Coordinator-->>-Vercel: Lock Acquired Vercel->>+Cache: SET lock:file/A (with TTL) Cache-->>-Vercel: OK Vercel->>Vercel: Run Prettier formatting... Vercel->>+Notifier: Publish { resourceId: 'file/A', status: 'success' } Notifier-->>-Vercel: Message Published Vercel->>+Coordinator: Release Lock on 'file/A' Coordinator-->>-Vercel: Lock Released Vercel-->>-Client: { status: 'processing' } Note over Client, Notifier: Subsequent status updates are handled asynchronously. Client->>+Vercel: POST /api/format { resourceId: 'file/A' } (Concurrent Request) Vercel->>+Cache: GET lock:file/A Cache-->>-Vercel: Cache Hit (Locked) Vercel-->>-Client: { status: 'locked', message: 'Resource is being processed.' }
环境准备与配置
在真实项目中,etcd
和 Memcached
实例需要高可用部署。为简化演示,我们假设其连接信息通过环境变量注入到 Vercel Functions 中。
.env.local
文件内容示例:
# etcd 集群的端点,多个用逗号分隔
ETCD_ENDPOINTS=http://etcd-node1:2379,http://etcd-node2:2379
# Memcached 服务器地址
MEMCACHED_SERVER=memcached-server:11211
# AWS SNS 配置
AWS_REGION=us-east-1
AWS_SNS_TOPIC_ARN=arn:aws:sns:us-east-1:123456789012:code-format-results
AWS_ACCESS_KEY_ID=YOUR_AWS_ACCESS_KEY
AWS_SECRET_ACCESS_KEY=YOUR_AWS_SECRET_KEY
项目依赖如下:
{
"dependencies": {
"etcd3": "^1.1.1",
"memcached": "^2.2.2",
"@aws-sdk/client-sns": "^3.433.0",
"prettier": "^3.0.3"
},
"devDependencies": {
"@vercel/node": "^3.0.7",
"typescript": "^5.2.2"
}
}
核心实现:Vercel Function
我们将所有逻辑封装在 api/format.ts
文件中。代码必须是生产级的,包含完整的错误处理、日志和配置管理。
// api/format.ts
import type { VercelRequest, VercelResponse } from '@vercel/node';
import { Etcd3, ILease, ILock } from 'etcd3';
import Memcached from 'memcached';
import { SNSClient, PublishCommand } from '@aws-sdk/client-sns';
import prettier from 'prettier';
// --- 1. 初始化与配置 ---
// 在真实项目中,这些客户端实例应该在函数外部初始化,以便在函数调用之间复用
// Vercel Serverless Functions 的冷启动行为会影响这一点,但最佳实践仍是如此。
const etcdClient = new Etcd3({
endpoints: process.env.ETCD_ENDPOINTS?.split(',') || ['http://127.0.0.1:2379'],
});
const memcached = new Memcached(process.env.MEMCACHED_SERVER || '127.0.0.1:11211', {
retries: 2,
timeout: 1000,
remove: true,
});
const snsClient = new SNSClient({
region: process.env.AWS_REGION,
credentials: {
accessKeyId: process.env.AWS_ACCESS_KEY_ID!,
secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY!,
},
});
const LOCK_TTL_SECONDS = 30; // 锁的最长持有时间,防止函数崩溃导致死锁
const CACHE_TTL_SECONDS = LOCK_TTL_SECONDS; // 缓存 TTL 应与锁的租约时间匹配
// --- 2. 核心处理函数 ---
export default async function handler(
req: VercelRequest,
res: VercelResponse,
) {
if (req.method !== 'POST') {
return res.status(405).json({ error: 'Method Not Allowed' });
}
const { resourceId, codeContent } = req.body;
if (!resourceId || typeof resourceId !== 'string' || !codeContent) {
return res.status(400).json({ error: 'Missing resourceId or codeContent' });
}
const lockKey = `format-lock:${resourceId}`;
const cacheKey = `lock-status:${resourceId}`;
// --- 3. 缓存层检查 ---
try {
const isLocked = await getFromCache(cacheKey);
if (isLocked) {
console.log(`[Cache Hit] Resource ${resourceId} is locked.`);
// 这里的 429 Too Many Requests 语义上比 409 Conflict 更适合高并发场景
return res.status(429).json({ status: 'locked', message: 'Resource is currently being processed by another request.' });
}
} catch (error) {
// 即使缓存失败,我们也应该继续尝试获取 etcd 锁,保证系统核心功能可用。
console.error('[Cache Error] Failed to check cache. Proceeding to etcd.', error);
}
// --- 4. 分布式锁获取 ---
let lease: ILease | null = null;
let lock: ILock | null = null;
try {
lease = etcdClient.lease(LOCK_TTL_SECONDS);
lock = lease.lock(lockKey);
// 设置一个较短的超时时间,如果锁被长时间占用,则快速失败
await lock.acquire(5000); // 尝试获取锁,最多等待5秒
// 成功获取锁,立即更新缓存
await setInCache(cacheKey, 'locked', CACHE_TTL_SECONDS);
// 立即响应客户端,告知任务已开始处理,后续通过 SNS 通知结果
res.status(202).json({ status: 'processing', message: 'Formatting task accepted.' });
// --- 5. 执行核心业务逻辑 ---
// 这是异步执行的部分,不阻塞对客户端的响应
processFormatting(resourceId, codeContent, lockKey);
} catch (error: any) {
// 捕获锁获取失败的情况,通常是由于锁已被占用
if (error.message && error.message.includes('etcd key')) {
console.warn(`[Lock Conflict] Failed to acquire lock for ${resourceId}.`);
// 这里的 409 Conflict 语义是准确的
if (!res.headersSent) {
res.status(409).json({ status: 'conflict', message: 'Resource lock conflict.' });
}
} else {
console.error(`[etcd Error] Failed to acquire lock for ${resourceId}:`, error);
if (!res.headersSent) {
res.status(503).json({ error: 'Service Unavailable', details: 'Failed to communicate with coordination service.' });
}
}
} finally {
// 不在这里释放锁,因为格式化任务是异步的。锁的释放将在 processFormatting 中完成。
// 租约机制保证了即使这里崩溃,锁也会在 TTL 之后自动释放。
}
}
// --- 6. 异步处理格式化任务 ---
async function processFormatting(resourceId: string, code: string, lockKey: string) {
let status: 'success' | 'failed' = 'failed';
let details = '';
let formattedCode = '';
try {
console.log(`[Processing] Starting formatting for ${resourceId}`);
// 假设这是一个耗时操作
formattedCode = await prettier.format(code, { parser: 'typescript' });
status = 'success';
details = 'Formatting completed successfully.';
console.log(`[Processing] Formatting for ${resourceId} successful.`);
} catch (e: any) {
status = 'failed';
details = e.message || 'An unknown error occurred during formatting.';
console.error(`[Processing Error] Formatting failed for ${resourceId}:`, e);
} finally {
// --- 7. 释放锁并清理缓存 ---
// 这里的 finally 块至关重要,无论成功或失败都必须尝试释放锁
try {
// 我们通过 key 来释放锁,而不是 lock 对象,因为 lock 对象在 handler 作用域内
// 这是一个简化,更好的方式是将 lease 和 lock 传递过来
await etcdClient.unlock(lockKey);
console.log(`[Lock Released] Lock for ${resourceId} has been released.`);
// 删除缓存,让下一个请求可以立即尝试获取锁
await deleteFromCache(`lock-status:${resourceId}`);
} catch (unlockError) {
console.error(`[CRITICAL] Failed to release lock for ${resourceId}. Lease will expire eventually.`, unlockError);
// 此时租约是最后的保障
}
// --- 8. 发送 SNS 通知 ---
await publishSnsNotification(resourceId, status, details, status === 'success' ? formattedCode : undefined);
}
}
// --- 9. 辅助函数 ---
function getFromCache(key: string): Promise<string | null> {
return new Promise((resolve, reject) => {
memcached.get(key, (err, data) => {
if (err) return reject(err);
resolve(data ? String(data) : null);
});
});
}
function setInCache(key: string, value: string, ttl: number): Promise<void> {
return new Promise((resolve, reject) => {
memcached.set(key, value, ttl, (err) => {
if (err) return reject(err);
resolve();
});
});
}
function deleteFromCache(key: string): Promise<void> {
return new Promise((resolve, reject) => {
memcached.del(key, (err) => {
if (err) return reject(err);
resolve();
});
});
}
async function publishSnsNotification(resourceId: string, status: 'success' | 'failed', details: string, formattedCode?: string) {
const messagePayload = {
resourceId,
status,
details,
timestamp: new Date().toISOString(),
// 在真实场景中,大的代码内容不应直接放在消息体中,而是上传到 S3 并在此处提供 URL
...(formattedCode && { formattedCodeUri: 's3://bucket/path/to/formatted/code' })
};
const command = new PublishCommand({
TopicArn: process.env.AWS_SNS_TOPIC_ARN,
Message: JSON.stringify(messagePayload),
MessageAttributes: {
'resourceId': { DataType: 'String', StringValue: resourceId },
'status': { DataType: 'String', StringValue: status }
}
});
try {
const result = await snsClient.send(command);
console.log(`[SNS Notification] Published message for ${resourceId}. MessageId: ${result.MessageId}`);
} catch (error) {
console.error(`[SNS Error] Failed to publish notification for ${resourceId}:`, error);
// 这里的失败需要有重试机制或告警,否则下游系统将无法获知结果
}
}
实现中的权衡与陷阱
etcd 租约 (Lease) 的重要性: 为什么不直接用
lock.lock()
而是lease.lock()
?因为 Serverless 函数可能随时被终止。如果没有租约,一个异常退出的函数将导致死锁。租约机制确保了即使持有锁的客户端失联,锁也会在租约到期后自动被 etcd 集群释放。LOCK_TTL_SECONDS
的选择是一个权衡:太短可能导致长时间任务未完成锁就过期,太长则在函数崩溃后需要更长时间才能恢复。Memcached 的作用: Memcached 在此架构中并非作为数据持久化,而是纯粹的“流量挡板”。当一个热门文件被锁定时,可能会有数十个并发请求涌入。如果没有 Memcached,这些请求都会去尝试获取 etcd 锁,给 etcd 带来巨大压力。有了 Memcached,除了第一个请求外,其余请求都会在缓存层被快速拒绝,成本极低。这是一个典型的用缓存保护核心协调服务的模式。
缓存失效与一致性: 缓存的 TTL 设置为与 etcd 租约的 TTL 相同,这是一个简化处理。更严谨的做法是,在释放 etcd 锁后,主动删除缓存键。代码中已经实现了这一点 (
deleteFromCache
)。这确保了锁被释放后,下一个请求能立即穿透缓存去获取新锁,而不是等待缓存过期。异步处理与客户端响应: 我们在成功获取锁之后,立即返回
202 Accepted
。这是因为格式化操作本身可能耗时,同步等待会长时间占用 Serverless 函数资源,增加成本且可能导致客户端超时。将实际工作转为异步,并通过 SNS 通知结果,是构建可扩展、高响应服务的标准模式。单元测试思路:
-
handler
函数: 模拟 VercelRequest 和 VercelResponse。需要 mocketcd3
、memcached
和aws-sdk
客户端。测试场景包括:请求体格式错误、缓存命中、缓存未命中但 etcd 锁冲突、成功获取锁等。 -
processFormatting
函数: 单独测试此函数。Mockprettier.format
使其成功或抛出异常。验证在finally
块中,etcdClient.unlock
和publishSnsNotification
是否被正确调用。 - 辅助函数: 对
getFromCache
,setInCache
等 Promise 化的辅助函数进行独立测试,确保它们正确处理了底层回调的成功与失败。
-
局限性与未来迭代方向
此方案虽然解决了核心的并发控制问题,但在生产环境中仍有可优化的空间。首先,直接依赖自建的 etcd
和 Memcached
集群带来了显著的运维负担。对于云原生环境,更理想的选择是使用云厂商提供的托管服务,例如 AWS ElastiCache for Memcached,以及寻找 etcd 的托管替代方案(或在 EKS/ACK 等托管 K8s 集群中利用其内置的 etcd)。
其次,将格式化后的代码内容直接放入 SNS 消息是不可取的,SNS 消息体有 256KB 的限制。正确的做法是将结果上传到 S3 或其他对象存储,然后在 SNS 消息中只包含该对象的引用(URI)。
最后,当前的锁粒度是基于 resourceId
(如文件路径)。对于更复杂的场景,可以考虑基于文件内容的哈希来生成锁键。这样,即便是不同路径下的相同内容文件,也可以共享格式化结果,避免重复劳动。这需要一个额外的、基于内容哈希的结果缓存层,可以进一步提升系统效率。