最近一次线上故障的根源,是一个核心服务强依赖的第三方地理位置API。在高并发时段,该API的响应时间从50ms飙升到5s,并伴随大量超时。我们的服务作为调用方,PHP-FPM进程池迅速被打满,CPU飙升,最终导致整个用户认证链路雪崩。复盘时,问题很明确:我们的服务缺乏对下游依赖的有效隔离,一个点的故障,通过同步调用,被无限放大并传导到了整个系统。
在PHP这种无状态、请求-响应模型的环境中,实现一个能跨进程、跨请求共享状态的熔断器,是解决这类问题的关键。一个简单的内存型熔断器在这里毫无用武之地,因为每个FPM工作进程的生命周期都极其短暂,状态无法延续。因此,我们必须设计一个依赖外部存储的、有状态的熔断器。这次构建的目标不仅仅是实现功能,而是要打造一个可复用、可配置、易于观测的韧性工程工具包(Resilience Kit)。
初步构想与技术选型
我们的熔断器工具包(Kit)必须满足几个核心要求:
- 外部化状态存储: 状态必须独立于PHP进程存在。
- 高性能: 状态读写不能成为新的性能瓶颈。
- 原子操作: 计数、状态变更等操作必须是原子的,避免并发场景下的数据不一致。
- 易于集成: 能够以最小的代码侵入性,应用到现有业务逻辑中。
- 强可观测性: 熔断器的每一次状态变更、每一次成功或失败的调用,都应产生结构化的日志,便于监控和告警。
基于这些要求,技术选型变得相对清晰:
- 状态存储: Redis是显而易见的选择。它的单线程模型和丰富的原子命令(如
INCR
,SETEX
,GET
)完美契合我们的需求。相比于数据库,它提供了更低的延迟;相比于文件锁,它避免了复杂的并发控制。 - 日志系统: 我们团队已经全面拥抱PSR-3标准,Monolog是事实上的标准库。我们的Kit不应该重新发明轮子,而是应该提供一个Monolog的Processor,将熔断器的上下文信息(如名称、当前状态)自动注入到日志中。
整个Kit的核心将由三部分组成:
- 一个灵活的
Storage
接口及其Redis
实现。 - 核心的
CircuitBreaker
类,封装了状态机逻辑。 - 一个
CircuitBreakerLogProcessor
,用于与Monolog集成。
步骤化实现:从接口到成品
我们从定义存储适配器的接口开始。这是一个好的实践,它强制我们面向接口编程,未来如果需要替换成其他存储(例如Memcached或Etcd),会非常容易。
1. 存储层抽象
<?php
declare(strict_types=1);
namespace ResilienceKit\Storage;
/**
* Interface StorageAdapter.
* Defines the contract for storing and retrieving circuit breaker state.
*/
interface StorageAdapter
{
/**
* Fetches a value from storage.
*
* @param string $key
* @return mixed
*/
public function fetch(string $key);
/**
* Saves a value to storage.
*
* @param string $key
* @param mixed $value
* @param int $ttl Seconds until expiry. 0 means it never expires.
* @return bool
*/
public function save(string $key, $value, int $ttl = 0): bool;
/**
* Deletes a value from storage.
*
* @param string $key
* @return bool
*/
public function delete(string $key): bool;
/**
* Increments a value atomically.
* If the key does not exist, it is set to 0 before performing the operation.
*
* @param string $key
* @param int $ttl Seconds until expiry, to be set on first increment.
* @return int The value of key after the increment.
*/
public function increment(string $key, int $ttl): int;
}
接口定义了四个基本操作:fetch
、save
、delete
和increment
。其中increment
是关键,它必须是原子性的,用来累加失败次数。
接下来是Redis的具体实现。在真实项目中,我们会使用phpredis
扩展或predis/predis
库。这里我们假设使用了phpredis
。
<?php
declare(strict_types=1);
namespace ResilienceKit\Storage;
use Redis;
use RedisException;
class RedisAdapter implements StorageAdapter
{
private Redis $redis;
private string $keyPrefix;
/**
* @param Redis $redis An already connected Redis client instance.
* @param string $keyPrefix Prefix for all keys to avoid collisions.
*/
public function __construct(Redis $redis, string $keyPrefix = 'resilience_kit:cb:')
{
$this->redis = $redis;
$this->keyPrefix = $keyPrefix;
}
private function getPrefixedKey(string $key): string
{
return $this->keyPrefix . $key;
}
public function fetch(string $key)
{
try {
// Redis returns false on failure or if key does not exist.
$value = $this->redis->get($this->getPrefixedKey($key));
return $value === false ? null : unserialize($value);
} catch (RedisException $e) {
// In a production scenario, log this error but return null to prevent failure.
// The circuit breaker should ideally fail safe (act as closed).
error_log('Redis fetch failed: ' . $e->getMessage());
return null;
}
}
public function save(string $key, $value, int $ttl = 0): bool
{
try {
$serializedValue = serialize($value);
if ($ttl > 0) {
return $this->redis->setex($this->getPrefixedKey($key), $ttl, $serializedValue);
}
return $this->redis->set($this->getPrefixedKey($key), $serializedValue);
} catch (RedisException $e) {
error_log('Redis save failed: ' . $e->getMessage());
return false;
}
}
public function delete(string $key): bool
{
try {
return $this->redis->del($this->getPrefixedKey($key)) > 0;
} catch (RedisException $e) {
error_log('Redis delete failed: ' . $e->getMessage());
return false;
}
}
public function increment(string $key, int $ttl): int
{
$prefixedKey = $this->getPrefixedKey($key);
try {
// Use a transaction to ensure atomicity of INCR and EXPIRE.
// This is crucial for the first time a key is incremented.
$this->redis->multi();
$this->redis->incr($prefixedKey);
$this->redis->expire($prefixedKey, $ttl);
$result = $this->redis->exec();
// exec() returns an array of results for each command in the transaction.
// We care about the result of INCR, which is the first element.
return (int) ($result[0] ?? 0);
} catch (RedisException $e) {
error_log('Redis increment failed: ' . $e->getMessage());
return 0; // Fail safe
}
}
}
这里的increment
实现值得注意。我们使用了Redis的MULTI/EXEC
事务来保证INCR
和EXPIRE
的原子性。这避免了一个竞态条件:如果在INCR
之后、EXPIRE
之前进程崩溃,这个计数器将永久存在。
2. 核心状态机:CircuitBreaker
现在是核心逻辑。熔断器有三个状态:CLOSED
(关闭)、OPEN
(打开)和HALF_OPEN
(半开)。在PHP 8.1+中,使用枚举(Enum)来表示这些状态是最佳实践。
<?php
declare(strict_types=1);
namespace ResilienceKit\CircuitBreaker;
enum State: string
{
case CLOSED = 'closed';
case OPEN = 'open';
case HALF_OPEN = 'half_open';
}
接下来是CircuitBreaker
类本身。它将负责管理状态转换、计数和执行受保护的操作。
<?php
declare(strict_types=1);
namespace ResilienceKit\CircuitBreaker;
use ResilienceKit\Storage\StorageAdapter;
use Throwable;
class CircuitBreaker
{
private const KEY_STATE = 'state';
private const KEY_FAILURES = 'failures';
private const KEY_LAST_TEST = 'last_test'; // For HALF_OPEN state
private string $serviceName;
private StorageAdapter $storage;
private Settings $settings;
public function __construct(string $serviceName, Settings $settings, StorageAdapter $storage)
{
$this->serviceName = $serviceName;
$this->settings = $settings;
$this->storage = $storage;
}
public function getName(): string
{
return $this->serviceName;
}
public function getCurrentState(): State
{
$stateData = $this->storage->fetch($this->getStateStorageKey());
return $stateData ? State::from($stateData['state']) : State::CLOSED;
}
/**
* Wraps the operation that needs to be protected.
*
* @param callable $operation The operation to execute.
* @return mixed The result of the operation.
* @throws CircuitBreakerOpenException If the circuit is open.
* @throws Throwable Re-throws the exception from the operation on failure.
*/
public function call(callable $operation)
{
if ($this->isOpen()) {
throw new CircuitBreakerOpenException("Circuit breaker for '{$this->serviceName}' is open.");
}
try {
$result = $operation();
$this->onSuccess();
return $result;
} catch (Throwable $e) {
// We should not trip the breaker for certain types of exceptions, e.g., validation errors (4xx).
// This logic can be customized. Here we assume all exceptions are failures.
$this->onFailure();
throw $e;
}
}
private function isOpen(): bool
{
$stateData = $this->storage->fetch($this->getStateStorageKey());
if (!$stateData) {
return false; // Default to closed
}
$state = State::from($stateData['state']);
if ($state === State::OPEN) {
$openedAt = $stateData['timestamp'];
// Check if the reset timeout has passed
if (time() > $openedAt + $this->settings->getResetTimeout()) {
$this->transitionTo(State::HALF_OPEN);
return false; // Allow one trial call
}
return true;
}
return false;
}
private function onSuccess(): void
{
$state = $this->getCurrentState();
if ($state === State::HALF_OPEN) {
// If the trial call in HALF_OPEN state succeeds, close the circuit.
$this->reset();
} else if ($state === State::CLOSED) {
// If we are in CLOSED state, a success doesn't require a state change,
// but we can ensure failure counters are gone.
$this->storage->delete($this->getFailuresStorageKey());
}
}
private function onFailure(): void
{
$state = $this->getCurrentState();
if ($state === State::HALF_OPEN) {
// If the trial call fails, re-open the circuit immediately for another reset timeout period.
$this->transitionTo(State::OPEN);
return;
}
// In CLOSED state, we increment the failure counter.
$failureCount = $this->storage->increment(
$this->getFailuresStorageKey(),
$this->settings->getFailureRateWindow()
);
if ($failureCount >= $this->settings->getFailureThreshold()) {
$this->transitionTo(State::OPEN);
}
}
private function transitionTo(State $newState): void
{
$stateData = [
'state' => $newState->value,
'timestamp' => time(),
];
$ttl = 0; // State should persist by default
if ($newState === State::OPEN) {
// The state itself needs to persist longer than the reset timeout.
// A good practice is to set TTL to a multiple of the reset timeout.
$ttl = $this->settings->getResetTimeout() * 2;
}
$this->storage->save($this->getStateStorageKey(), $stateData, $ttl);
}
public function reset(): void
{
$this->storage->delete($this->getStateStorageKey());
$this->storage->delete($this->getFailuresStorageKey());
}
private function getStateStorageKey(): string
{
return "{$this->serviceName}:state";
}
private function getFailuresStorageKey(): string
{
return "{$this->serviceName}:failures";
}
}
这个类包含了熔断器的核心逻辑。call()
方法是入口点。它首先检查状态(isOpen()
),如果电路是打开的并且没到重置时间,就直接抛出异常,从而保护下游服务。如果电路是关闭或半开的,它会执行业务操作。成功后调用onSuccess()
重置状态,失败则调用onFailure()
增加失败计数并可能转换到打开状态。
Settings
类是一个简单的数据对象,用于传递配置:
<?php
declare(strict_types=1);
namespace ResilienceKit\CircuitBreaker;
class Settings
{
private int $failureThreshold; // e.g., 5 failures
private int $failureRateWindow; // e.g., 60 seconds
private int $resetTimeout; // e.g., 30 seconds (time in OPEN state)
public function __construct(int $failureThreshold, int $failureRateWindow, int $resetTimeout)
{
$this->failureThreshold = $failureThreshold;
$this->failureRateWindow = $failureRateWindow;
$this->resetTimeout = $resetTimeout;
}
// ... getters for each property
}
3. 集成结构化日志
为了实现可观测性,我们需要一个Monolog Processor。这个Processor会在每条日志中添加熔断器的当前状态。
<?php
declare(strict_types=1);
namespace ResilienceKit\Logging;
use Monolog\LogRecord;
use Monolog\Processor\ProcessorInterface;
use ResilienceKit\CircuitBreaker\CircuitBreaker;
class CircuitBreakerLogProcessor implements ProcessorInterface
{
/** @var CircuitBreaker[] */
private array $circuitBreakers = [];
public function addCircuitBreaker(CircuitBreaker $cb): void
{
$this->circuitBreakers[$cb->getName()] = $cb;
}
public function __invoke(LogRecord $record): LogRecord
{
if (empty($this->circuitBreakers)) {
return $record;
}
$context = $record->extra;
$cbStates = [];
foreach ($this->circuitBreakers as $name => $cb) {
// This might involve a Redis call. In high-performance scenarios,
// you might want to cache this state for the duration of the request.
$cbStates[$name] = $cb->getCurrentState()->value;
}
$context['circuit_breakers'] = $cbStates;
return $record->with(extra: $context);
}
}
这个Processor的设计有一个权衡。每次记录日志都去调用$cb->getCurrentState()
意味着一次Redis查询。在单个请求中有大量日志的情况下,这可能成为开销。一个常见的优化是在请求开始时获取一次状态,缓存在Processor的成员变量中。但为了简单和数据实时性,我们这里保留了实时查询。
最终成果:整合与使用
现在,我们把所有部分组装起来。假设我们有一个GeoServiceClient
,它负责调用那个不稳定的第三方API。
<?php
use Monolog\Logger;
use Monolog\Handler\StreamHandler;
use Monolog\Formatter\JsonFormatter;
use ResilienceKit\CircuitBreaker\CircuitBreaker;
use ResilienceKit\CircuitBreaker\CircuitBreakerOpenException;
use ResilienceKit\CircuitBreaker\Settings;
use ResilienceKit\Logging\CircuitBreakerLogProcessor;
use ResilienceKit\Storage\RedisAdapter;
// 1. Setup Dependencies
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
$storageAdapter = new RedisAdapter($redis, 'app_name:cb:');
$cbLogProcessor = new CircuitBreakerLogProcessor();
$logger = new Logger('api_requests');
$handler = new StreamHandler('php://stdout', Logger::DEBUG);
$handler->setFormatter(new JsonFormatter());
$logger->pushHandler($handler);
$logger->pushProcessor($cbLogProcessor);
// 2. Configure and instantiate the Circuit Breaker
$geoApiSettings = new Settings(
failureThreshold: 10, // 10 failures
failureRateWindow: 60, // in a 60 second window
resetTimeout: 30 // will trip the breaker for 30 seconds
);
$geoApiCircuitBreaker = new CircuitBreaker(
'geo_api_service',
$geoApiSettings,
$storageAdapter
);
// Register the circuit breaker with the log processor for observability
$cbLogProcessor->addCircuitBreaker($geoApiCircuitBreaker);
// 3. The service client that uses the circuit breaker
class GeoServiceClient
{
private CircuitBreaker $circuitBreaker;
private Logger $logger;
public function __construct(CircuitBreaker $circuitBreaker, Logger $logger)
{
$this->circuitBreaker = $circuitBreaker;
$this->logger = $logger;
}
public function getLocation(string $ip): ?array
{
try {
return $this->circuitBreaker->call(function () use ($ip) {
$this->logger->info('Calling Geo API', ['ip' => $ip]);
// Simulate a call to a flaky service
if (rand(1, 10) > 3) { // 70% chance of success
// Simulate network latency
usleep(rand(50000, 200000));
return ['city' => 'New York', 'country' => 'USA'];
} else {
throw new \RuntimeException("Geo API timed out");
}
});
} catch (CircuitBreakerOpenException $e) {
$this->logger->warning('Circuit breaker is open for Geo API.', [
'error' => $e->getMessage()
]);
// Return a fallback value, null, or a cached response
return ['city' => 'Default', 'country' => 'N/A'];
} catch (\Throwable $e) {
$this->logger->error('Failed to call Geo API.', [
'error' => $e->getMessage(),
'ip' => $ip
]);
// Re-throwing is an option, but often you want to handle it and provide fallback
return null;
}
}
}
// 4. Usage in application context
$client = new GeoServiceClient($geoApiCircuitBreaker, $logger);
for ($i = 0; $i < 50; $i++) {
echo "Request #{$i}: ";
$location = $client->getLocation('8.8.8.8');
if ($location) {
echo "Got location: {$location['city']}\n";
} else {
echo "Failed to get location.\n";
}
sleep(1);
}
运行这段模拟代码,你会看到这样的日志输出 (JSON格式):
成功时:{"message":"Calling Geo API","context":{"ip":"8.8.8.8"},"level":200,"level_name":"INFO",...,"extra":{"circuit_breakers":{"geo_api_service":"closed"}}}
当熔断器打开后,调用会立即失败:{"message":"Circuit breaker is open for Geo API.","context":{"error":"Circuit breaker for 'geo_api_service' is open."},"level":300,"level_name":"WARNING",...,"extra":{"circuit_breakers":{"geo_api_service":"open"}}}
当服务调用失败时:{"message":"Failed to call Geo API.","context":{"error":"Geo API timed out","ip":"8.8.8.8"},"level":400,"level_name":"ERROR",...,"extra":{"circuit_breakers":{"geo_api_service":"closed"}}}
这里的extra.circuit_breakers
字段,就是我们实现可观测性的关键。我们可以轻易地将这些JSON日志导入到Elasticsearch或Loki等平台,然后创建仪表盘来监控每个熔断器的状态,或者当某个熔断器状态变为open
时触发告警。
熔断器状态转换图
为了更清晰地理解状态机,我们可以用Mermaid图来表示:
stateDiagram-v2 [*] --> CLOSED: Initial State CLOSED --> OPEN: Failure threshold reached note right of CLOSED Failure count >= threshold within the time window. end note OPEN --> HALF_OPEN: Reset timeout expired note left of OPEN Allows a single trial request to pass through. end note HALF_OPEN --> CLOSED: Trial request succeeds note left of HALF_OPEN Resets failure counter. Normal operation resumes. end note HALF_OPEN --> OPEN: Trial request fails note right of HALF_OPEN Immediately re-opens the circuit for another reset timeout period. end note CLOSED --> CLOSED: Request succeeds
局限性与未来迭代方向
我们构建的这个Kit虽然解决了核心问题,但在真实、复杂的生产环境中,它仍有几个可以改进的地方:
静态配置: 当前熔断器的阈值和超时是硬编码或通过配置文件在启动时加载的。一个更先进的系统应该支持动态配置,允许运维人员在不重新部署应用的情况下,通过配置中心(如Nacos, Consul)实时调整策略。例如,在大促期间临时提高下游服务的失败阈值。
单一熔断粒度: 我们的熔断器是针对一个
serviceName
的。但在微服务架构中,一个服务可能暴露多个API,它们的稳定性和重要性各不相同。或许需要更细粒度的熔断,比如基于serviceName + apiEndpoint
的组合键。缺乏请求合并: 当熔断器处于
HALF_OPEN
状态时,理论上只应允许一个请求通过作为“探针”。在PHP-FPM模型下,由于进程间无通信,可能会在同一时刻有多个进程都认为可以发出试探请求。虽然这不影响最终结果(一旦一个失败,状态会立刻切回OPEN
),但并不是严格意义上的“单一请求试探”。在Swoole或Workerman这样的常驻内存应用中,可以通过原子锁或内存表实现更精确的控制。韧性模式单一: 目前只实现了熔断器。一个完整的韧性工具包还应该包括:限流器(Rate Limiter),防止服务被过多请求压垮;舱壁隔离(Bulkhead),通过隔离资源(如独立的数据库连接池)防止单个功能故障影响整个应用;重试(Retry)机制,并结合指数退避策略。这些都可以作为未来添加到这个Kit中的新组件。