使用 Ray Actors 构建一个分布式、有状态的断路器服务


一个断路器的核心本质是一个状态机。它封装了对下游服务的调用,并根据失败率在三个状态之间转换:CLOSED(允许所有调用通过)、OPEN(立即拒绝所有调用)和 HALF_OPEN(允许有限的探测调用以确定服务是否恢复)。在单体应用或单个进程中,实现这个状态机相对直接,一个类实例就能维护所有状态。

但在一个由数十上百个微服务构成的分布式系统中,这个简单的模型开始失效。问题在于状态的共享与一致性。如果服务A的10个实例都各自维护一个针对服务B的断路器状态,那么当服务B出现故障时,需要累计 10 * failure_threshold 次失败才能让所有实例都熔断。同样,当服务B恢复时,恢复的感知也是分散和延迟的。我们需要的是一个针对服务B的、逻辑上统一的断路器,它的状态被所有调用方实例共享。

这正是 Ray Actors 发挥作用的地方。Ray Actor 是一个有状态的、可通过句柄异步调用的 Python 对象。Ray 运行时保证一个 Actor 实例一次只执行一个方法,从而天然地解决了并发访问状态的竞态问题。我们可以将每个断路器的状态机封装在一个专用的 Actor 中,从而创建一个分布式的、有状态的、可集中管理的断路器服务。

架构设计

我们的目标是构建一个平台级的断路器服务,而不是让每个业务服务都去关心断路器的实现细节。

整体架构如下:

  1. CircuitBreakerActor: 每个需要保护的下游服务(或其特定API)都对应一个此类型的 Actor 实例。它独立维护着 CLOSED, OPEN, HALF_OPEN 的状态、失败计数、最后失败时间等。这是断路器逻辑的核心。
  2. BreakerManagerActor: 这是一个单例 Actor,作为服务注册中心。它负责创建、存储和分发所有 CircuitBreakerActor 的句柄。业务方通过一个唯一的服务名向它请求对应的断路器句柄。这样做的好处是集中管理和配置。
  3. Client-Side Wrapper: 一个轻量级的客户端函数或装饰器,它封装了与 Actor 通信的逻辑,使业务代码可以像调用普通函数一样使用断路器。
graph TD
    subgraph Ray Cluster
        BM(BreakerManagerActor)
        CBA1(CircuitBreakerActor 
for 'user-service') CBA2(CircuitBreakerActor
for 'payment-service') end subgraph Business Services ServiceA1(Service A - Instance 1) ServiceA2(Service A - Instance 2) ServiceB1(Service B - Instance 1) end subgraph Downstream Dependencies UserService(User Service API) PaymentService(Payment Service API) end ServiceA1 -- 1. Get Handle for 'user-service' --> BM ServiceA2 -- 1. Get Handle for 'user-service' --> BM BM -- 2. Returns Handle --> ServiceA1 BM -- 2. Returns Handle --> ServiceA2 ServiceA1 -- 3. Execute call via Actor --> CBA1 ServiceA2 -- 3. Execute call via Actor --> CBA1 CBA1 -- 4. If state is CLOSED --> UserService ServiceB1 -- 1. Get Handle for 'payment-service' --> BM BM -- 2. Returns Handle --> ServiceB1 ServiceB1 -- 3. Execute call via Actor --> CBA2 CBA2 -- 4. If state is CLOSED --> PaymentService

这种设计的关键优势在于,所有Service A的实例共享同一个CircuitBreakerActor实例(CBA1),因此它们看到的是完全一致的断路器状态。

核心实现:CircuitBreakerActor

这是整个系统的核心。我们需要精确地管理状态转换逻辑。一个常见的错误是状态转换的判断条件过于简单,忽略了时间窗口和并发场景。

import time
import asyncio
import logging
from enum import Enum
from dataclasses import dataclass, field
from typing import Callable, Any, Dict, Type

import ray

# 配置结构化的日志,这在分布式系统中排查问题至关重要
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

class CircuitState(Enum):
    CLOSED = "CLOSED"
    OPEN = "OPEN"
    HALF_OPEN = "HALF_OPEN"

class CircuitOpenError(Exception):
    """当断路器处于OPEN状态时,拒绝调用时抛出"""
    def __init__(self, service_name: str, message: str):
        self.service_name = service_name
        super().__init__(message)

@dataclass
class BreakerConfig:
    """
    断路器配置,使用dataclass使其清晰且不可变。
    """
    failure_threshold: int = 5  # 连续失败多少次后打开断路器
    recovery_timeout_seconds: int = 30  # OPEN状态持续多久后转为HALF_OPEN
    half_open_success_threshold: int = 3 # HALF_OPEN状态下需要多少次成功调用才能关闭
    expected_exceptions: list[Type[Exception]] = field(default_factory=lambda: [Exception])

@ray.remote(num_cpus=0.1)  # Actors是轻量级的,给一个很小的CPU资源
class CircuitBreakerActor:
    """
    一个独立的、有状态的Actor,用于管理单个服务的断路器逻辑。
    """
    def __init__(self, service_name: str, config: BreakerConfig):
        self.service_name = service_name
        self.config = config
        
        # 状态变量
        self._state: CircuitState = CircuitState.CLOSED
        self._failure_count: int = 0
        self._last_failure_time: float = 0.0
        self._half_open_success_count: int = 0
        
        logger.info(f"[Breaker '{self.service_name}'] Actor initialized with config: {config}")

    def get_state(self) -> dict:
        """提供一个查询方法,用于监控和调试"""
        return {
            "service_name": self.service_name,
            "state": self._state.value,
            "failure_count": self._failure_count,
            "last_failure_time": self._last_failure_time,
            "half_open_success_count": self._half_open_success_count
        }

    def _is_exception_relevant(self, exc: Exception) -> bool:
        """检查捕获的异常是否是我们需要计数的失败类型"""
        return any(isinstance(exc, exc_type) for exc_type in self.config.expected_exceptions)

    def _trip(self):
        """状态转换: CLOSED -> OPEN"""
        if self._state == CircuitState.CLOSED:
            self._state = CircuitState.OPEN
            self._last_failure_time = time.monotonic()
            logger.warning(f"[Breaker '{self.service_name}'] Tripped to OPEN state.")

    def _reset(self):
        """状态转换: -> CLOSED"""
        if self._state != CircuitState.CLOSED:
            logger.info(f"[Breaker '{self.service_name}'] Reset to CLOSED state.")
        self._state = CircuitState.CLOSED
        self._failure_count = 0
        self._last_failure_time = 0.0
        self._half_open_success_count = 0

    def _attempt_reset(self):
        """状态转换: OPEN -> HALF_OPEN"""
        now = time.monotonic()
        if self._state == CircuitState.OPEN and now - self._last_failure_time >= self.config.recovery_timeout_seconds:
            self._state = CircuitState.HALF_OPEN
            self._half_open_success_count = 0
            logger.info(f"[Breaker '{self.service_name}'] Moved to HALF_OPEN state.")

    async def call(self, func: Callable, *args, **kwargs) -> Any:
        """
        核心执行逻辑,包裹实际的函数调用。
        这里的'func'不能直接传递,因为函数对象可能无法被Ray序列化。
        更稳健的方式是传递服务标识和参数,由Actor内部逻辑决定调用哪个真实服务。
        为简化示例,我们假设传递的函数是可序列化的。
        """
        self._attempt_reset() # 每次调用前都检查是否可以从OPEN变为HALF_OPEN

        if self._state == CircuitState.OPEN:
            raise CircuitOpenError(
                self.service_name,
                f"Circuit for '{self.service_name}' is open. Call rejected."
            )

        # 在 CLOSED 或 HALF_OPEN 状态下,执行调用
        try:
            result = await func(*args, **kwargs)
            self._on_success()
            return result
        except Exception as e:
            if self._is_exception_relevant(e):
                self._on_failure()
            # 无论是否是相关异常,都必须向上抛出,让调用方知道发生了错误
            raise

    def _on_success(self):
        """调用成功时的状态处理"""
        if self._state == CircuitState.HALF_OPEN:
            self._half_open_success_count += 1
            if self._half_open_success_count >= self.config.half_open_success_threshold:
                self._reset()
        elif self._state == CircuitState.CLOSED:
            # 在CLOSED状态下成功,重置失败计数
            if self._failure_count > 0:
                self._reset()
    
    def _on_failure(self):
        """调用失败时的状态处理"""
        if self._state == CircuitState.HALF_OPEN:
            # 在 HALF_OPEN 状态下任何一次失败都会立即回到 OPEN
            self._trip()
            return

        # 在 CLOSED 状态下
        self._failure_count += 1
        if self._failure_count >= self.config.failure_threshold:
            self._trip()

这个 Actor 的实现有几个关键点:

  • 状态隔离: 所有状态变量 (_state, _failure_count等) 都被封装在 Actor 内部。Ray 保证了对这些变量的访问是串行的,我们无需手动加锁。
  • 时间源: 使用 time.monotonic() 而不是 time.time(),因为它不受系统时间变化的影响,对于计算时间间隔更可靠。
  • 异步支持: call 方法被定义为 async,使其可以包裹异步的下游调用,这在现代Python服务中非常普遍。
  • 异常处理: 明确区分需要触发熔断的异常类型 (expected_exceptions),避免因为客户端的参数错误等问题导致断路器误判。

管理器实现:BreakerManagerActor

这个 Actor 相对简单,它的职责就像一个字典或工厂,但运行在 Ray 集群中,可以被任何节点访问。

@ray.remote(num_cpus=0.1)
class BreakerManagerActor:
    def __init__(self):
        # 存储 service_name -> CircuitBreakerActor handle 的映射
        self._breakers: Dict[str, ray.actor.ActorHandle] = {}
        logger.info("BreakerManagerActor initialized.")

    def register_breaker(self, service_name: str, config: BreakerConfig, force_recreate: bool = False):
        """
        注册或获取一个断路器。
        在真实项目中,这里可以从分布式配置中心加载配置。
        """
        if service_name not in self._breakers or force_recreate:
            logger.info(f"Creating new CircuitBreakerActor for '{service_name}'.")
            
            # 动态命名Actor,便于在Ray Dashboard中观察
            actor_name = f"CircuitBreaker-{service_name}"
            
            # .options() 用于指定Actor的创建参数
            self._breakers[service_name] = CircuitBreakerActor.options(
                name=actor_name, lifetime="detached"
            ).remote(service_name, config)
        return self._breakers[service_name]

    def get_breaker(self, service_name: str) -> ray.actor.ActorHandle:
        """获取断路器句柄"""
        if service_name not in self._breakers:
            raise ValueError(f"No breaker registered for service '{service_name}'")
        return self._breakers[service_name]

    def list_breakers(self) -> Dict[str, Any]:
        """列出所有断路器的当前状态,用于监控"""
        # 使用 ray.get 等待所有actor返回状态
        states = {
            name: handle.get_state.remote()
            for name, handle in self._breakers.items()
        }
        return ray.get(list(states.values()))

lifetime="detached" 是一个重要的生产环境参数。它意味着即使创建这个 Actor 的 Driver 进程退出了,这个 Actor 仍然会存活在 Ray 集群中,直到集群关闭。这正是我们需要的“服务”行为。

客户端使用与实践

现在看看业务代码如何使用这个服务。

# --- 模拟一个不稳定的下游服务 ---
class UnstableService:
    def __init__(self):
        self.is_healthy = True
        self._counter = 0

    async def make_call(self, payload: str) -> str:
        self._counter += 1
        # 模拟服务在调用几次后开始故障
        if self._counter > 5 and self._counter < 15:
            self.is_healthy = False
        else:
            self.is_healthy = True
        
        await asyncio.sleep(0.1) # 模拟网络延迟
        if not self.is_healthy:
            raise ConnectionError("Service is down")
        return f"Success: processed {payload}"

# --- 客户端封装 ---
async def call_with_breaker(
    manager: ray.actor.ActorHandle, 
    service_name: str, 
    func: Callable, 
    *args, 
    **kwargs
):
    try:
        # 1. 从管理器获取断路器句柄
        breaker_handle = await manager.get_breaker.remote(service_name)
        
        # 2. 通过断路器Actor执行调用
        result = await breaker_handle.call.remote(func, *args, **kwargs)
        logger.info(f"Call to '{service_name}' succeeded. Result: {result}")
        return result
    except CircuitOpenError as e:
        # 断路器打开,执行降级逻辑
        logger.error(f"Circuit open for '{e.service_name}'. Fallback initiated.")
        return "Fallback data"
    except Exception as e:
        # 其他异常(包括触发熔断的下游服务异常)
        logger.error(f"Call to '{service_name}' failed with error: {e}")
        # 这里也可以执行降级逻辑
        return "Fallback data on failure"

# --- 启动和运行 ---
async def main():
    # 在本地模拟一个Ray集群
    ray.init(ignore_reinit_error=True, num_cpus=4)
    
    # 创建并命名Manager Actor
    manager = BreakerManagerActor.options(name="BreakerManager", lifetime="detached").remote()

    # 定义一个服务的断路器配置
    service_config = BreakerConfig(
        failure_threshold=3,
        recovery_timeout_seconds=5,
        half_open_success_threshold=2,
        expected_exceptions=[ConnectionError]
    )
    
    # 注册断路器
    await manager.register_breaker.remote("unstable-api", service_config)

    unstable_service = UnstableService()

    # 模拟连续调用
    for i in range(20):
        print(f"\n--- Attempt {i+1} ---")
        await call_with_breaker(
            manager, 
            "unstable-api", 
            unstable_service.make_call, 
            f"data_{i+1}"
        )
        
        # 打印当前所有断路器的状态
        states = await manager.list_breakers.remote()
        print("Current Breaker States:", states)

        await asyncio.sleep(0.5)

if __name__ == "__main__":
    asyncio.run(main())
    ray.shutdown()

运行这段代码,你会观察到以下行为:

  1. 初始阶段 (CLOSED): 前5次调用成功,断路器状态保持 CLOSED
  2. 触发熔断 (-> OPEN): 第6、7、8次调用失败 (ConnectionError),失败计数达到阈值3,断路器状态变为 OPEN
  3. 熔断期间 (OPEN): 接下来的调用会立即失败并抛出 CircuitOpenError,根本不会去请求不稳定的服务,从而保护了系统资源。
  4. 进入半开 (-> HALF_OPEN): 5秒恢复期过后,断路器进入 HALF_OPEN
  5. 恢复探测 (HALF_OPEN): 此时服务已经恢复。HALF_OPEN 状态下的第一次、第二次调用成功,达到 half_open_success_threshold 阈值。
  6. 完全恢复 (-> CLOSED): 断路器状态重置为 CLOSED,系统恢复正常。

生产环境的考量与局限

尽管这个基于 Ray Actor 的方案优雅地解决了分布式状态管理问题,但在真实项目中,还有几个坑需要注意:

  1. Actor 句柄的获取成本: ray.get(manager.get_breaker.remote(...)) 包含一次网络 RTT。客户端不应该在每次调用时都去获取句柄,而应该在初始化时获取一次并缓存起来。

  2. BreakerManagerActor 的单点问题: 虽然 Actor 本身是容错的(Ray 会在节点失败时尝试重启它),但这个管理器在逻辑上是个单点。对于超大规模系统,可以考虑使用 Ray 的 Placement Groups 来部署一组管理器 Actor,或者干脆让客户端在启动时从管理器获取所有需要的断路器句柄,之后不再依赖管理器。

  3. 调用延迟: 每次通过断路器调用,都会增加一次到 CircuitBreakerActor 的网络延迟。对于延迟极其敏感的服务,这种模型可能不适用。它更适合那些下游服务本身延迟较高(例如几十毫秒到几百毫秒)的场景,此时增加的几毫秒 Actor 通信延迟可以忽略不计。

  4. 监控与可观测性: get_statelist_breakers 提供了基本的状态查询。在生产环境中,每个 CircuitBreakerActor 都应该主动将状态变化、成功/失败计数等指标推送到 Prometheus 或类似监控系统中,以便进行告警和趋势分析。这可以通过在 Actor 中集成一个 metrics 客户端来实现。

这个方案的适用边界在于那些需要对外部依赖(如第三方API、数据库、其他微服务)进行统一、有状态的弹性保护,并且能够接受少量额外通信开销的分布式应用。它将复杂的并发状态控制逻辑委托给 Ray 运行时,让开发者可以专注于业务逻辑本身,是 Ray 核心库在构建弹性基础设施组件方面一个强有力的应用范例。


  目录