在 Serverless 架构中集成 etcd 与 Memcached 构建幂等且高并发的代码格式化任务


我们的 GitOps 流程开始出现一个棘手的问题。在一个庞大的 Monorepo 中,多个 CI/CD 流水线可能被几乎同时触发,它们都会尝试对同一组配置文件(例如共享的 package.jsontsconfig.json)执行自动化代码格式化。这直接导致了频繁的 Git 冲突和流水线失败。起初的解决方案是在 CI Runner 内部加一个本地文件锁,但这在 Vercel Functions 这种分布式、无状态的 Serverless 环境下毫无意义,每一次函数调用都是一个独立的实例。我们需要一个跨实例、跨调用的全局锁。

初步构想是利用数据库行锁,但为这样一个高频、短时效的锁机制引入一个重型数据库,并在 Serverless 函数中管理其连接池,显得过于笨重且成本高昂。我们需要的是一个轻量级、专为分布式协调而生的工具。这自然而然地指向了 etcd。它的租约(Lease)和锁(Lock)机制似乎是为这个场景量身定做的。

最终的技术方案定型如下:

  1. 执行环境: Vercel Functions 作为 Serverless API 端点,接收格式化请求。
  2. 核心逻辑: Prettier 作为代码格式化引擎,在函数内部执行。
  3. 分布式锁: etcd 提供全局唯一的锁,确保对同一个资源(文件)的格式化操作是串行的。租约机制可以保证即使函数异常退出,锁也能自动释放,避免死锁。
  4. 性能优化: 在 etcd 之前增加一层 Memcached 缓存。对于一个已经被锁定的资源,后续大量的请求可以直接被 Memcached 快速拒绝,避免对 etcd 集群造成不必要的压力。
  5. 异步通知: 格式化任务完成后,通过 AWS SNS 发送一个通知。这使得核心服务与后续的流程(如更新 GitHub Check、发送 Slack 通知)完全解耦。

下面是这个系统的架构流程图。

sequenceDiagram
    participant Client as CI/CD Pipeline
    participant Vercel as Vercel Function API
    participant Cache as Memcached
    participant Coordinator as etcd
    participant Notifier as AWS SNS

    Client->>+Vercel: POST /api/format { resourceId: 'file/A' }
    Vercel->>+Cache: GET lock:file/A
    Cache-->>-Vercel: Cache Miss
    Vercel->>+Coordinator: Acquire Lock on 'file/A' with Lease
    Coordinator-->>-Vercel: Lock Acquired
    Vercel->>+Cache: SET lock:file/A (with TTL)
    Cache-->>-Vercel: OK
    Vercel->>Vercel: Run Prettier formatting...
    Vercel->>+Notifier: Publish { resourceId: 'file/A', status: 'success' }
    Notifier-->>-Vercel: Message Published
    Vercel->>+Coordinator: Release Lock on 'file/A'
    Coordinator-->>-Vercel: Lock Released
    Vercel-->>-Client: { status: 'processing' }

    Note over Client, Notifier: Subsequent status updates are handled asynchronously.

    Client->>+Vercel: POST /api/format { resourceId: 'file/A' } (Concurrent Request)
    Vercel->>+Cache: GET lock:file/A
    Cache-->>-Vercel: Cache Hit (Locked)
    Vercel-->>-Client: { status: 'locked', message: 'Resource is being processed.' }

环境准备与配置

在真实项目中,etcdMemcached 实例需要高可用部署。为简化演示,我们假设其连接信息通过环境变量注入到 Vercel Functions 中。

.env.local 文件内容示例:

# etcd 集群的端点,多个用逗号分隔
ETCD_ENDPOINTS=http://etcd-node1:2379,http://etcd-node2:2379

# Memcached 服务器地址
MEMCACHED_SERVER=memcached-server:11211

# AWS SNS 配置
AWS_REGION=us-east-1
AWS_SNS_TOPIC_ARN=arn:aws:sns:us-east-1:123456789012:code-format-results
AWS_ACCESS_KEY_ID=YOUR_AWS_ACCESS_KEY
AWS_SECRET_ACCESS_KEY=YOUR_AWS_SECRET_KEY

项目依赖如下:

{
  "dependencies": {
    "etcd3": "^1.1.1",
    "memcached": "^2.2.2",
    "@aws-sdk/client-sns": "^3.433.0",
    "prettier": "^3.0.3"
  },
  "devDependencies": {
    "@vercel/node": "^3.0.7",
    "typescript": "^5.2.2"
  }
}

核心实现:Vercel Function

我们将所有逻辑封装在 api/format.ts 文件中。代码必须是生产级的,包含完整的错误处理、日志和配置管理。

// api/format.ts

import type { VercelRequest, VercelResponse } from '@vercel/node';
import { Etcd3, ILease, ILock } from 'etcd3';
import Memcached from 'memcached';
import { SNSClient, PublishCommand } from '@aws-sdk/client-sns';
import prettier from 'prettier';

// --- 1. 初始化与配置 ---
// 在真实项目中,这些客户端实例应该在函数外部初始化,以便在函数调用之间复用
// Vercel Serverless Functions 的冷启动行为会影响这一点,但最佳实践仍是如此。

const etcdClient = new Etcd3({
  endpoints: process.env.ETCD_ENDPOINTS?.split(',') || ['http://127.0.0.1:2379'],
});

const memcached = new Memcached(process.env.MEMCACHED_SERVER || '127.0.0.1:11211', {
  retries: 2,
  timeout: 1000,
  remove: true,
});

const snsClient = new SNSClient({
  region: process.env.AWS_REGION,
  credentials: {
    accessKeyId: process.env.AWS_ACCESS_KEY_ID!,
    secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY!,
  },
});

const LOCK_TTL_SECONDS = 30; // 锁的最长持有时间,防止函数崩溃导致死锁
const CACHE_TTL_SECONDS = LOCK_TTL_SECONDS; // 缓存 TTL 应与锁的租约时间匹配

// --- 2. 核心处理函数 ---
export default async function handler(
  req: VercelRequest,
  res: VercelResponse,
) {
  if (req.method !== 'POST') {
    return res.status(405).json({ error: 'Method Not Allowed' });
  }

  const { resourceId, codeContent } = req.body;
  if (!resourceId || typeof resourceId !== 'string' || !codeContent) {
    return res.status(400).json({ error: 'Missing resourceId or codeContent' });
  }

  const lockKey = `format-lock:${resourceId}`;
  const cacheKey = `lock-status:${resourceId}`;

  // --- 3. 缓存层检查 ---
  try {
    const isLocked = await getFromCache(cacheKey);
    if (isLocked) {
      console.log(`[Cache Hit] Resource ${resourceId} is locked.`);
      // 这里的 429 Too Many Requests 语义上比 409 Conflict 更适合高并发场景
      return res.status(429).json({ status: 'locked', message: 'Resource is currently being processed by another request.' });
    }
  } catch (error) {
    // 即使缓存失败,我们也应该继续尝试获取 etcd 锁,保证系统核心功能可用。
    console.error('[Cache Error] Failed to check cache. Proceeding to etcd.', error);
  }

  // --- 4. 分布式锁获取 ---
  let lease: ILease | null = null;
  let lock: ILock | null = null;
  try {
    lease = etcdClient.lease(LOCK_TTL_SECONDS);
    lock = lease.lock(lockKey);

    // 设置一个较短的超时时间,如果锁被长时间占用,则快速失败
    await lock.acquire(5000); // 尝试获取锁,最多等待5秒

    // 成功获取锁,立即更新缓存
    await setInCache(cacheKey, 'locked', CACHE_TTL_SECONDS);

    // 立即响应客户端,告知任务已开始处理,后续通过 SNS 通知结果
    res.status(202).json({ status: 'processing', message: 'Formatting task accepted.' });

    // --- 5. 执行核心业务逻辑 ---
    // 这是异步执行的部分,不阻塞对客户端的响应
    processFormatting(resourceId, codeContent, lockKey);

  } catch (error: any) {
    // 捕获锁获取失败的情况,通常是由于锁已被占用
    if (error.message && error.message.includes('etcd key')) {
        console.warn(`[Lock Conflict] Failed to acquire lock for ${resourceId}.`);
        // 这里的 409 Conflict 语义是准确的
        if (!res.headersSent) {
          res.status(409).json({ status: 'conflict', message: 'Resource lock conflict.' });
        }
    } else {
        console.error(`[etcd Error] Failed to acquire lock for ${resourceId}:`, error);
        if (!res.headersSent) {
          res.status(503).json({ error: 'Service Unavailable', details: 'Failed to communicate with coordination service.' });
        }
    }
  } finally {
    // 不在这里释放锁,因为格式化任务是异步的。锁的释放将在 processFormatting 中完成。
    // 租约机制保证了即使这里崩溃,锁也会在 TTL 之后自动释放。
  }
}

// --- 6. 异步处理格式化任务 ---
async function processFormatting(resourceId: string, code: string, lockKey: string) {
    let status: 'success' | 'failed' = 'failed';
    let details = '';
    let formattedCode = '';

    try {
        console.log(`[Processing] Starting formatting for ${resourceId}`);
        // 假设这是一个耗时操作
        formattedCode = await prettier.format(code, { parser: 'typescript' });
        status = 'success';
        details = 'Formatting completed successfully.';
        console.log(`[Processing] Formatting for ${resourceId} successful.`);
    } catch (e: any) {
        status = 'failed';
        details = e.message || 'An unknown error occurred during formatting.';
        console.error(`[Processing Error] Formatting failed for ${resourceId}:`, e);
    } finally {
        // --- 7. 释放锁并清理缓存 ---
        // 这里的 finally 块至关重要,无论成功或失败都必须尝试释放锁
        try {
            // 我们通过 key 来释放锁,而不是 lock 对象,因为 lock 对象在 handler 作用域内
            // 这是一个简化,更好的方式是将 lease 和 lock 传递过来
            await etcdClient.unlock(lockKey);
            console.log(`[Lock Released] Lock for ${resourceId} has been released.`);
            
            // 删除缓存,让下一个请求可以立即尝试获取锁
            await deleteFromCache(`lock-status:${resourceId}`);

        } catch (unlockError) {
            console.error(`[CRITICAL] Failed to release lock for ${resourceId}. Lease will expire eventually.`, unlockError);
            // 此时租约是最后的保障
        }

        // --- 8. 发送 SNS 通知 ---
        await publishSnsNotification(resourceId, status, details, status === 'success' ? formattedCode : undefined);
    }
}


// --- 9. 辅助函数 ---

function getFromCache(key: string): Promise<string | null> {
  return new Promise((resolve, reject) => {
    memcached.get(key, (err, data) => {
      if (err) return reject(err);
      resolve(data ? String(data) : null);
    });
  });
}

function setInCache(key: string, value: string, ttl: number): Promise<void> {
  return new Promise((resolve, reject) => {
    memcached.set(key, value, ttl, (err) => {
      if (err) return reject(err);
      resolve();
    });
  });
}

function deleteFromCache(key: string): Promise<void> {
    return new Promise((resolve, reject) => {
        memcached.del(key, (err) => {
            if (err) return reject(err);
            resolve();
        });
    });
}

async function publishSnsNotification(resourceId: string, status: 'success' | 'failed', details: string, formattedCode?: string) {
  const messagePayload = {
    resourceId,
    status,
    details,
    timestamp: new Date().toISOString(),
    // 在真实场景中,大的代码内容不应直接放在消息体中,而是上传到 S3 并在此处提供 URL
    ...(formattedCode && { formattedCodeUri: 's3://bucket/path/to/formatted/code' })
  };

  const command = new PublishCommand({
    TopicArn: process.env.AWS_SNS_TOPIC_ARN,
    Message: JSON.stringify(messagePayload),
    MessageAttributes: {
      'resourceId': { DataType: 'String', StringValue: resourceId },
      'status': { DataType: 'String', StringValue: status }
    }
  });

  try {
    const result = await snsClient.send(command);
    console.log(`[SNS Notification] Published message for ${resourceId}. MessageId: ${result.MessageId}`);
  } catch (error) {
    console.error(`[SNS Error] Failed to publish notification for ${resourceId}:`, error);
    // 这里的失败需要有重试机制或告警,否则下游系统将无法获知结果
  }
}

实现中的权衡与陷阱

  1. etcd 租约 (Lease) 的重要性: 为什么不直接用 lock.lock() 而是 lease.lock()?因为 Serverless 函数可能随时被终止。如果没有租约,一个异常退出的函数将导致死锁。租约机制确保了即使持有锁的客户端失联,锁也会在租约到期后自动被 etcd 集群释放。LOCK_TTL_SECONDS 的选择是一个权衡:太短可能导致长时间任务未完成锁就过期,太长则在函数崩溃后需要更长时间才能恢复。

  2. Memcached 的作用: Memcached 在此架构中并非作为数据持久化,而是纯粹的“流量挡板”。当一个热门文件被锁定时,可能会有数十个并发请求涌入。如果没有 Memcached,这些请求都会去尝试获取 etcd 锁,给 etcd 带来巨大压力。有了 Memcached,除了第一个请求外,其余请求都会在缓存层被快速拒绝,成本极低。这是一个典型的用缓存保护核心协调服务的模式。

  3. 缓存失效与一致性: 缓存的 TTL 设置为与 etcd 租约的 TTL 相同,这是一个简化处理。更严谨的做法是,在释放 etcd 锁后,主动删除缓存键。代码中已经实现了这一点 (deleteFromCache)。这确保了锁被释放后,下一个请求能立即穿透缓存去获取新锁,而不是等待缓存过期。

  4. 异步处理与客户端响应: 我们在成功获取锁之后,立即返回 202 Accepted。这是因为格式化操作本身可能耗时,同步等待会长时间占用 Serverless 函数资源,增加成本且可能导致客户端超时。将实际工作转为异步,并通过 SNS 通知结果,是构建可扩展、高响应服务的标准模式。

  5. 单元测试思路:

    • handler 函数: 模拟 VercelRequest 和 VercelResponse。需要 mock etcd3memcachedaws-sdk 客户端。测试场景包括:请求体格式错误、缓存命中、缓存未命中但 etcd 锁冲突、成功获取锁等。
    • processFormatting 函数: 单独测试此函数。Mock prettier.format使其成功或抛出异常。验证在 finally 块中,etcdClient.unlockpublishSnsNotification 是否被正确调用。
    • 辅助函数: 对 getFromCache, setInCache 等 Promise 化的辅助函数进行独立测试,确保它们正确处理了底层回调的成功与失败。

局限性与未来迭代方向

此方案虽然解决了核心的并发控制问题,但在生产环境中仍有可优化的空间。首先,直接依赖自建的 etcdMemcached 集群带来了显著的运维负担。对于云原生环境,更理想的选择是使用云厂商提供的托管服务,例如 AWS ElastiCache for Memcached,以及寻找 etcd 的托管替代方案(或在 EKS/ACK 等托管 K8s 集群中利用其内置的 etcd)。

其次,将格式化后的代码内容直接放入 SNS 消息是不可取的,SNS 消息体有 256KB 的限制。正确的做法是将结果上传到 S3 或其他对象存储,然后在 SNS 消息中只包含该对象的引用(URI)。

最后,当前的锁粒度是基于 resourceId(如文件路径)。对于更复杂的场景,可以考虑基于文件内容的哈希来生成锁键。这样,即便是不同路径下的相同内容文件,也可以共享格式化结果,避免重复劳动。这需要一个额外的、基于内容哈希的结果缓存层,可以进一步提升系统效率。


  目录