基于PHP构建一个生产级的有状态熔断器与结构化日志工具包


最近一次线上故障的根源,是一个核心服务强依赖的第三方地理位置API。在高并发时段,该API的响应时间从50ms飙升到5s,并伴随大量超时。我们的服务作为调用方,PHP-FPM进程池迅速被打满,CPU飙升,最终导致整个用户认证链路雪崩。复盘时,问题很明确:我们的服务缺乏对下游依赖的有效隔离,一个点的故障,通过同步调用,被无限放大并传导到了整个系统。

在PHP这种无状态、请求-响应模型的环境中,实现一个能跨进程、跨请求共享状态的熔断器,是解决这类问题的关键。一个简单的内存型熔断器在这里毫无用武之地,因为每个FPM工作进程的生命周期都极其短暂,状态无法延续。因此,我们必须设计一个依赖外部存储的、有状态的熔断器。这次构建的目标不仅仅是实现功能,而是要打造一个可复用、可配置、易于观测的韧性工程工具包(Resilience Kit)。

初步构想与技术选型

我们的熔断器工具包(Kit)必须满足几个核心要求:

  1. 外部化状态存储: 状态必须独立于PHP进程存在。
  2. 高性能: 状态读写不能成为新的性能瓶颈。
  3. 原子操作: 计数、状态变更等操作必须是原子的,避免并发场景下的数据不一致。
  4. 易于集成: 能够以最小的代码侵入性,应用到现有业务逻辑中。
  5. 强可观测性: 熔断器的每一次状态变更、每一次成功或失败的调用,都应产生结构化的日志,便于监控和告警。

基于这些要求,技术选型变得相对清晰:

  • 状态存储: Redis是显而易见的选择。它的单线程模型和丰富的原子命令(如 INCR, SETEX, GET)完美契合我们的需求。相比于数据库,它提供了更低的延迟;相比于文件锁,它避免了复杂的并发控制。
  • 日志系统: 我们团队已经全面拥抱PSR-3标准,Monolog是事实上的标准库。我们的Kit不应该重新发明轮子,而是应该提供一个Monolog的Processor,将熔断器的上下文信息(如名称、当前状态)自动注入到日志中。

整个Kit的核心将由三部分组成:

  1. 一个灵活的Storage接口及其Redis实现。
  2. 核心的CircuitBreaker类,封装了状态机逻辑。
  3. 一个CircuitBreakerLogProcessor,用于与Monolog集成。

步骤化实现:从接口到成品

我们从定义存储适配器的接口开始。这是一个好的实践,它强制我们面向接口编程,未来如果需要替换成其他存储(例如Memcached或Etcd),会非常容易。

1. 存储层抽象

<?php

declare(strict_types=1);

namespace ResilienceKit\Storage;

/**
 * Interface StorageAdapter.
 * Defines the contract for storing and retrieving circuit breaker state.
 */
interface StorageAdapter
{
    /**
     * Fetches a value from storage.
     *
     * @param string $key
     * @return mixed
     */
    public function fetch(string $key);

    /**
     * Saves a value to storage.
     *
     * @param string $key
     * @param mixed $value
     * @param int $ttl Seconds until expiry. 0 means it never expires.
     * @return bool
     */
    public function save(string $key, $value, int $ttl = 0): bool;

    /**
     * Deletes a value from storage.
     *
     * @param string $key
     * @return bool
     */
    public function delete(string $key): bool;

    /**
     * Increments a value atomically.
     * If the key does not exist, it is set to 0 before performing the operation.
     *
     * @param string $key
     * @param int $ttl Seconds until expiry, to be set on first increment.
     * @return int The value of key after the increment.
     */
    public function increment(string $key, int $ttl): int;
}

接口定义了四个基本操作:fetchsavedeleteincrement。其中increment是关键,它必须是原子性的,用来累加失败次数。

接下来是Redis的具体实现。在真实项目中,我们会使用phpredis扩展或predis/predis库。这里我们假设使用了phpredis

<?php

declare(strict_types=1);

namespace ResilienceKit\Storage;

use Redis;
use RedisException;

class RedisAdapter implements StorageAdapter
{
    private Redis $redis;
    private string $keyPrefix;

    /**
     * @param Redis $redis An already connected Redis client instance.
     * @param string $keyPrefix Prefix for all keys to avoid collisions.
     */
    public function __construct(Redis $redis, string $keyPrefix = 'resilience_kit:cb:')
    {
        $this->redis = $redis;
        $this->keyPrefix = $keyPrefix;
    }

    private function getPrefixedKey(string $key): string
    {
        return $this->keyPrefix . $key;
    }

    public function fetch(string $key)
    {
        try {
            // Redis returns false on failure or if key does not exist.
            $value = $this->redis->get($this->getPrefixedKey($key));
            return $value === false ? null : unserialize($value);
        } catch (RedisException $e) {
            // In a production scenario, log this error but return null to prevent failure.
            // The circuit breaker should ideally fail safe (act as closed).
            error_log('Redis fetch failed: ' . $e->getMessage());
            return null;
        }
    }

    public function save(string $key, $value, int $ttl = 0): bool
    {
        try {
            $serializedValue = serialize($value);
            if ($ttl > 0) {
                return $this->redis->setex($this->getPrefixedKey($key), $ttl, $serializedValue);
            }
            return $this->redis->set($this->getPrefixedKey($key), $serializedValue);
        } catch (RedisException $e) {
            error_log('Redis save failed: ' . $e->getMessage());
            return false;
        }
    }

    public function delete(string $key): bool
    {
        try {
            return $this->redis->del($this->getPrefixedKey($key)) > 0;
        } catch (RedisException $e) {
            error_log('Redis delete failed: ' . $e->getMessage());
            return false;
        }
    }

    public function increment(string $key, int $ttl): int
    {
        $prefixedKey = $this->getPrefixedKey($key);
        try {
            // Use a transaction to ensure atomicity of INCR and EXPIRE.
            // This is crucial for the first time a key is incremented.
            $this->redis->multi();
            $this->redis->incr($prefixedKey);
            $this->redis->expire($prefixedKey, $ttl);
            $result = $this->redis->exec();
            
            // exec() returns an array of results for each command in the transaction.
            // We care about the result of INCR, which is the first element.
            return (int) ($result[0] ?? 0);
        } catch (RedisException $e) {
            error_log('Redis increment failed: ' . $e->getMessage());
            return 0; // Fail safe
        }
    }
}

这里的increment实现值得注意。我们使用了Redis的MULTI/EXEC事务来保证INCREXPIRE的原子性。这避免了一个竞态条件:如果在INCR之后、EXPIRE之前进程崩溃,这个计数器将永久存在。

2. 核心状态机:CircuitBreaker

现在是核心逻辑。熔断器有三个状态:CLOSED(关闭)、OPEN(打开)和HALF_OPEN(半开)。在PHP 8.1+中,使用枚举(Enum)来表示这些状态是最佳实践。

<?php
declare(strict_types=1);

namespace ResilienceKit\CircuitBreaker;

enum State: string
{
    case CLOSED = 'closed';
    case OPEN = 'open';
    case HALF_OPEN = 'half_open';
}

接下来是CircuitBreaker类本身。它将负责管理状态转换、计数和执行受保护的操作。

<?php

declare(strict_types=1);

namespace ResilienceKit\CircuitBreaker;

use ResilienceKit\Storage\StorageAdapter;
use Throwable;

class CircuitBreaker
{
    private const KEY_STATE = 'state';
    private const KEY_FAILURES = 'failures';
    private const KEY_LAST_TEST = 'last_test'; // For HALF_OPEN state

    private string $serviceName;
    private StorageAdapter $storage;
    private Settings $settings;

    public function __construct(string $serviceName, Settings $settings, StorageAdapter $storage)
    {
        $this->serviceName = $serviceName;
        $this->settings = $settings;
        $this->storage = $storage;
    }
    
    public function getName(): string
    {
        return $this->serviceName;
    }
    
    public function getCurrentState(): State
    {
        $stateData = $this->storage->fetch($this->getStateStorageKey());
        return $stateData ? State::from($stateData['state']) : State::CLOSED;
    }

    /**
     * Wraps the operation that needs to be protected.
     *
     * @param callable $operation The operation to execute.
     * @return mixed The result of the operation.
     * @throws CircuitBreakerOpenException If the circuit is open.
     * @throws Throwable Re-throws the exception from the operation on failure.
     */
    public function call(callable $operation)
    {
        if ($this->isOpen()) {
            throw new CircuitBreakerOpenException("Circuit breaker for '{$this->serviceName}' is open.");
        }

        try {
            $result = $operation();
            $this->onSuccess();
            return $result;
        } catch (Throwable $e) {
            // We should not trip the breaker for certain types of exceptions, e.g., validation errors (4xx).
            // This logic can be customized. Here we assume all exceptions are failures.
            $this->onFailure();
            throw $e;
        }
    }

    private function isOpen(): bool
    {
        $stateData = $this->storage->fetch($this->getStateStorageKey());
        if (!$stateData) {
            return false; // Default to closed
        }

        $state = State::from($stateData['state']);

        if ($state === State::OPEN) {
            $openedAt = $stateData['timestamp'];
            // Check if the reset timeout has passed
            if (time() > $openedAt + $this->settings->getResetTimeout()) {
                $this->transitionTo(State::HALF_OPEN);
                return false; // Allow one trial call
            }
            return true;
        }

        return false;
    }

    private function onSuccess(): void
    {
        $state = $this->getCurrentState();
        if ($state === State::HALF_OPEN) {
            // If the trial call in HALF_OPEN state succeeds, close the circuit.
            $this->reset();
        } else if ($state === State::CLOSED) {
            // If we are in CLOSED state, a success doesn't require a state change,
            // but we can ensure failure counters are gone.
            $this->storage->delete($this->getFailuresStorageKey());
        }
    }

    private function onFailure(): void
    {
        $state = $this->getCurrentState();

        if ($state === State::HALF_OPEN) {
            // If the trial call fails, re-open the circuit immediately for another reset timeout period.
            $this->transitionTo(State::OPEN);
            return;
        }
        
        // In CLOSED state, we increment the failure counter.
        $failureCount = $this->storage->increment(
            $this->getFailuresStorageKey(),
            $this->settings->getFailureRateWindow()
        );

        if ($failureCount >= $this->settings->getFailureThreshold()) {
            $this->transitionTo(State::OPEN);
        }
    }
    
    private function transitionTo(State $newState): void
    {
        $stateData = [
            'state' => $newState->value,
            'timestamp' => time(),
        ];
        
        $ttl = 0; // State should persist by default
        if ($newState === State::OPEN) {
            // The state itself needs to persist longer than the reset timeout.
            // A good practice is to set TTL to a multiple of the reset timeout.
            $ttl = $this->settings->getResetTimeout() * 2;
        }

        $this->storage->save($this->getStateStorageKey(), $stateData, $ttl);
    }
    
    public function reset(): void
    {
        $this->storage->delete($this->getStateStorageKey());
        $this->storage->delete($this->getFailuresStorageKey());
    }

    private function getStateStorageKey(): string
    {
        return "{$this->serviceName}:state";
    }

    private function getFailuresStorageKey(): string
    {
        return "{$this->serviceName}:failures";
    }
}

这个类包含了熔断器的核心逻辑。call()方法是入口点。它首先检查状态(isOpen()),如果电路是打开的并且没到重置时间,就直接抛出异常,从而保护下游服务。如果电路是关闭或半开的,它会执行业务操作。成功后调用onSuccess()重置状态,失败则调用onFailure()增加失败计数并可能转换到打开状态。

Settings类是一个简单的数据对象,用于传递配置:

<?php
declare(strict_types=1);

namespace ResilienceKit\CircuitBreaker;

class Settings
{
    private int $failureThreshold; // e.g., 5 failures
    private int $failureRateWindow; // e.g., 60 seconds
    private int $resetTimeout; // e.g., 30 seconds (time in OPEN state)

    public function __construct(int $failureThreshold, int $failureRateWindow, int $resetTimeout)
    {
        $this->failureThreshold = $failureThreshold;
        $this->failureRateWindow = $failureRateWindow;
        $this->resetTimeout = $resetTimeout;
    }
    
    // ... getters for each property
}

3. 集成结构化日志

为了实现可观测性,我们需要一个Monolog Processor。这个Processor会在每条日志中添加熔断器的当前状态。

<?php

declare(strict_types=1);

namespace ResilienceKit\Logging;

use Monolog\LogRecord;
use Monolog\Processor\ProcessorInterface;
use ResilienceKit\CircuitBreaker\CircuitBreaker;

class CircuitBreakerLogProcessor implements ProcessorInterface
{
    /** @var CircuitBreaker[] */
    private array $circuitBreakers = [];

    public function addCircuitBreaker(CircuitBreaker $cb): void
    {
        $this->circuitBreakers[$cb->getName()] = $cb;
    }
    
    public function __invoke(LogRecord $record): LogRecord
    {
        if (empty($this->circuitBreakers)) {
            return $record;
        }
        
        $context = $record->extra;
        $cbStates = [];
        
        foreach ($this->circuitBreakers as $name => $cb) {
            // This might involve a Redis call. In high-performance scenarios,
            // you might want to cache this state for the duration of the request.
            $cbStates[$name] = $cb->getCurrentState()->value;
        }
        
        $context['circuit_breakers'] = $cbStates;
        
        return $record->with(extra: $context);
    }
}

这个Processor的设计有一个权衡。每次记录日志都去调用$cb->getCurrentState()意味着一次Redis查询。在单个请求中有大量日志的情况下,这可能成为开销。一个常见的优化是在请求开始时获取一次状态,缓存在Processor的成员变量中。但为了简单和数据实时性,我们这里保留了实时查询。

最终成果:整合与使用

现在,我们把所有部分组装起来。假设我们有一个GeoServiceClient,它负责调用那个不稳定的第三方API。

<?php

use Monolog\Logger;
use Monolog\Handler\StreamHandler;
use Monolog\Formatter\JsonFormatter;
use ResilienceKit\CircuitBreaker\CircuitBreaker;
use ResilienceKit\CircuitBreaker\CircuitBreakerOpenException;
use ResilienceKit\CircuitBreaker\Settings;
use ResilienceKit\Logging\CircuitBreakerLogProcessor;
use ResilienceKit\Storage\RedisAdapter;

// 1. Setup Dependencies
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);

$storageAdapter = new RedisAdapter($redis, 'app_name:cb:');
$cbLogProcessor = new CircuitBreakerLogProcessor();

$logger = new Logger('api_requests');
$handler = new StreamHandler('php://stdout', Logger::DEBUG);
$handler->setFormatter(new JsonFormatter());
$logger->pushHandler($handler);
$logger->pushProcessor($cbLogProcessor);


// 2. Configure and instantiate the Circuit Breaker
$geoApiSettings = new Settings(
    failureThreshold: 10,       // 10 failures
    failureRateWindow: 60,      // in a 60 second window
    resetTimeout: 30            // will trip the breaker for 30 seconds
);
$geoApiCircuitBreaker = new CircuitBreaker(
    'geo_api_service',
    $geoApiSettings,
    $storageAdapter
);

// Register the circuit breaker with the log processor for observability
$cbLogProcessor->addCircuitBreaker($geoApiCircuitBreaker);


// 3. The service client that uses the circuit breaker
class GeoServiceClient
{
    private CircuitBreaker $circuitBreaker;
    private Logger $logger;

    public function __construct(CircuitBreaker $circuitBreaker, Logger $logger)
    {
        $this->circuitBreaker = $circuitBreaker;
        $this->logger = $logger;
    }

    public function getLocation(string $ip): ?array
    {
        try {
            return $this->circuitBreaker->call(function () use ($ip) {
                $this->logger->info('Calling Geo API', ['ip' => $ip]);
                
                // Simulate a call to a flaky service
                if (rand(1, 10) > 3) { // 70% chance of success
                    // Simulate network latency
                    usleep(rand(50000, 200000));
                    return ['city' => 'New York', 'country' => 'USA'];
                } else {
                    throw new \RuntimeException("Geo API timed out");
                }
            });
        } catch (CircuitBreakerOpenException $e) {
            $this->logger->warning('Circuit breaker is open for Geo API.', [
                'error' => $e->getMessage()
            ]);
            // Return a fallback value, null, or a cached response
            return ['city' => 'Default', 'country' => 'N/A'];
        } catch (\Throwable $e) {
            $this->logger->error('Failed to call Geo API.', [
                'error' => $e->getMessage(),
                'ip' => $ip
            ]);
            // Re-throwing is an option, but often you want to handle it and provide fallback
            return null;
        }
    }
}


// 4. Usage in application context
$client = new GeoServiceClient($geoApiCircuitBreaker, $logger);

for ($i = 0; $i < 50; $i++) {
    echo "Request #{$i}: ";
    $location = $client->getLocation('8.8.8.8');
    if ($location) {
        echo "Got location: {$location['city']}\n";
    } else {
        echo "Failed to get location.\n";
    }
    sleep(1);
}

运行这段模拟代码,你会看到这样的日志输出 (JSON格式):
成功时:
{"message":"Calling Geo API","context":{"ip":"8.8.8.8"},"level":200,"level_name":"INFO",...,"extra":{"circuit_breakers":{"geo_api_service":"closed"}}}

当熔断器打开后,调用会立即失败:
{"message":"Circuit breaker is open for Geo API.","context":{"error":"Circuit breaker for 'geo_api_service' is open."},"level":300,"level_name":"WARNING",...,"extra":{"circuit_breakers":{"geo_api_service":"open"}}}

当服务调用失败时:
{"message":"Failed to call Geo API.","context":{"error":"Geo API timed out","ip":"8.8.8.8"},"level":400,"level_name":"ERROR",...,"extra":{"circuit_breakers":{"geo_api_service":"closed"}}}

这里的extra.circuit_breakers字段,就是我们实现可观测性的关键。我们可以轻易地将这些JSON日志导入到Elasticsearch或Loki等平台,然后创建仪表盘来监控每个熔断器的状态,或者当某个熔断器状态变为open时触发告警。

熔断器状态转换图

为了更清晰地理解状态机,我们可以用Mermaid图来表示:

stateDiagram-v2
    [*] --> CLOSED: Initial State
    
    CLOSED --> OPEN: Failure threshold reached
    note right of CLOSED
        Failure count >= threshold
        within the time window.
    end note

    OPEN --> HALF_OPEN: Reset timeout expired
    note left of OPEN
        Allows a single trial
        request to pass through.
    end note
    
    HALF_OPEN --> CLOSED: Trial request succeeds
    note left of HALF_OPEN
        Resets failure counter.
        Normal operation resumes.
    end note
    
    HALF_OPEN --> OPEN: Trial request fails
    note right of HALF_OPEN
        Immediately re-opens the
        circuit for another reset
        timeout period.
    end note

    CLOSED --> CLOSED: Request succeeds

局限性与未来迭代方向

我们构建的这个Kit虽然解决了核心问题,但在真实、复杂的生产环境中,它仍有几个可以改进的地方:

  1. 静态配置: 当前熔断器的阈值和超时是硬编码或通过配置文件在启动时加载的。一个更先进的系统应该支持动态配置,允许运维人员在不重新部署应用的情况下,通过配置中心(如Nacos, Consul)实时调整策略。例如,在大促期间临时提高下游服务的失败阈值。

  2. 单一熔断粒度: 我们的熔断器是针对一个serviceName的。但在微服务架构中,一个服务可能暴露多个API,它们的稳定性和重要性各不相同。或许需要更细粒度的熔断,比如基于serviceName + apiEndpoint的组合键。

  3. 缺乏请求合并: 当熔断器处于HALF_OPEN状态时,理论上只应允许一个请求通过作为“探针”。在PHP-FPM模型下,由于进程间无通信,可能会在同一时刻有多个进程都认为可以发出试探请求。虽然这不影响最终结果(一旦一个失败,状态会立刻切回OPEN),但并不是严格意义上的“单一请求试探”。在Swoole或Workerman这样的常驻内存应用中,可以通过原子锁或内存表实现更精确的控制。

  4. 韧性模式单一: 目前只实现了熔断器。一个完整的韧性工具包还应该包括:限流器(Rate Limiter),防止服务被过多请求压垮;舱壁隔离(Bulkhead),通过隔离资源(如独立的数据库连接池)防止单个功能故障影响整个应用;重试(Retry)机制,并结合指数退避策略。这些都可以作为未来添加到这个Kit中的新组件。


  目录