基于 AWS Lambda 与 Paxos 构建无服务器分布式配置存储的实现


我们需要一个高可用的全局配置服务,用于存储一些关键业务的元数据。传统的方案,如部署一个 ZooKeeper 或 etcd 集群,对于我们某些低流量但高可用的场景而言,其24/7运行的服务器实例带来了不小的闲置资源成本和运维负担。这引发了一个思考:能否利用 Serverless 的按需计费模型,构建一个在保证高可用的前提下,成本结构更优的替代方案?这个想法的核心挑战在于,如何在本质上无状态、短暂的 AWS Lambda 函数之上,实现一个需要持久化状态和节点间共识的分布式系统。

初步构想与技术选型决策

初步构想是使用一组 Lambda 函数模拟一个分布式集群的节点。当需要更新配置时,客户端通过一个统一的入口点触发其中一个 Lambda,这个 Lambda 作为提议者(Proposer)发起一轮 Paxos 共识协议。集群中的其他 Lambda 函数作为接受者(Acceptor)参与决策。一旦超过半数的节点达成共识,新的配置值就被“学习”(Learned)并持久化。

这个架构的关键在于解决 Lambda 的无状态问题。Paxos 协议的正确性依赖于每个节点能够持久化其承诺和接受的最高提案编号。因此,我们需要一个外部的、高可用的持久化存储。

graph TD
    subgraph "客户端"
        Client[外部调用者]
    end

    subgraph "API 网关层"
        Kong[Kong API Gateway]
    end

    subgraph "AWS Serverless 环境"
        ProposerLambda[Lambda: Proposer角色]
        AcceptorLambda1[Lambda: Acceptor角色 1]
        AcceptorLambda2[Lambda: Acceptor角色 2]
        AcceptorLambdaN[Lambda: Acceptor角色 N]

        ProposerLambda -- "Prepare/Accept 请求" --> AcceptorLambda1
        ProposerLambda -- "Prepare/Accept 请求" --> AcceptorLambda2
        ProposerLambda -- "Prepare/Accept 请求" --> AcceptorLambdaN

        subgraph "持久化状态层"
            DynamoDB[(DynamoDB: Paxos日志与状态)]
        end

        ProposerLambda -- "读写Paxos日志" --> DynamoDB
        AcceptorLambda1 -- "读写Paxos日志" --> DynamoDB
        AcceptorLambda2 -- "读写Paxos日志" --> DynamoDB
        AcceptorLambdaN -- "读写Paxos日志" --> DynamoDB
    end

    subgraph "自动化部署"
       GitLab[GitLab CI/CD] -- "部署/更新" --> ProposerLambda
       GitLab -- "部署/更新" --> AcceptorLambda1
       GitLab -- "部署/更新" --> AcceptorLambda2
       GitLab -- "部署/更新" --> AcceptorLambdaN
       GitLab -- "配置" --> Kong
       GitLab -- "创建/迁移" --> DynamoDB
    end

    Client -- "HTTP POST /config" --> Kong
    Kong -- "路由" --> ProposerLambda

技术选型决策如下:

  1. 计算节点: AWS Lambda

    • 原因: 极致的成本效益。对于写操作不频繁的配置中心,只有在发起共识时才产生计算费用。免去了服务器维护、补丁和容量规划的麻烦。
    • 挑战: 函数实例是短暂且无状态的。节点身份、网络发现和状态持久化是必须解决的核心问题。
  2. 共识协议: Paxos 算法

    • 原因: Paxos 是经过严格证明的分布式共识算法,是构建高可用系统的理论基石。在这里,我们实现一个简化的 Multi-Paxos 的单次实例来阐述核心思想。
    • 挑战: 协议本身复杂,实现细节多。在 Lambda 环境下,节点间的通信延迟和不确定性会给协议的活性(Liveness)带来更大挑战。
  3. 状态存储: Amazon DynamoDB

    • 原因: 同样是 Serverless 模型,与 Lambda 完美契合。它提供的高性能、强一致性读写以及条件写入(Conditional Writes)功能,是实现 Paxos 协议中原子性操作的理想工具。我们不需要为闲置的数据库付费。
    • 挑战: 需要精心设计表结构和访问模式,以避免热点和不必要的开销。
  4. API 入口: Kong

    • 原因: 提供一个稳定、统一的 API 入口。客户端无需关心后端 Lambda 函数的具体实现或 ARN。Kong 还能提供认证、速率限制、日志记录等必要的外围能力。
    • 挑战: Kong 本身需要部署和维护,但在我们的场景中,我们将其部署在单个 EC2 或 ECS 实例上,作为架构中唯一的常驻有状态服务。
  5. 部署自动化: GitLab CI/CD

    • 原因: 这个系统包含多个 Lambda 函数、IAM 角色、DynamoDB 表和 Kong 配置。手动管理这些资源极易出错。一个健壮的 CI/CD 流水线是保证部署一致性和可重复性的关键。
    • 挑战: 流水线需要处理 AWS 凭证、多环境配置以及部署失败的回滚策略。

步骤化实现

1. DynamoDB 表设计

我们需要一张表来存储 Paxos 协议运行过程中的所有状态。这张表是整个系统的“真理之源”。

  • 表名: paxos_log
  • 分区键 (Partition Key): ConfigKey (String) - 我们要达成共识的配置项名称,例如 database.connection.string
  • 排序键 (Sort Key): ProposalID (Number) - 提案编号,保证 Paxos 协议的进度。
  • 属性:
    • Phase: (String) 记录当前提案ID所处的阶段,如 promised, accepted
    • Value: (String) 在 accept 阶段携带的配置值。
    • SourceNodeID: (String) 发起该记录的节点ID。
    • Timestamp: (Number) 操作时间戳。

此外,我们还需要一个地方记录每个 ConfigKey 当前已被“学习”的最终值。可以复用该表,使用一个特殊的 ProposalID(如 0)来存储,或者建立一个单独的 learned_values 表。为了简化,我们先聚焦于 paxos_log 表。

2. Paxos 核心逻辑在 Lambda 中的实现 (Python)

我们将创建一个 Lambda 函数,它既能扮演 Proposer 角色,也能扮演 Acceptor 角色。通过传入的事件体来区分当前调用需要执行哪个逻辑。假设我们有一个包含3个节点的集群(3个 Lambda 函数实例的 ARN)。

这是核心的 Python 实现,使用 boto3 与 DynamoDB 交互。

# paxos_lambda/app.py
import os
import json
import logging
import time
import boto3
from botocore.exceptions import ClientError

# --- 配置 ---
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()

DYNAMODB_TABLE_NAME = os.environ.get('DYNAMODB_TABLE', 'paxos_log')
ACCEPTOR_ARNS_JSON = os.environ.get('ACCEPTOR_ARNS', '[]')
ACCEPTOR_ARNS = json.loads(ACCEPTOR_ARNS_JSON)
QUORUM_SIZE = len(ACCEPTOR_ARNS) // 2 + 1

dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(DYNAMODB_TABLE_NAME)
lambda_client = boto3.client('lambda')

# --- 核心 Paxos 实现 ---

def handler(event, context):
    """
    Lambda 入口,根据 event['action'] 分发到不同角色
    """
    action = event.get('action')
    config_key = event.get('config_key')
    
    if not action or not config_key:
        return {'statusCode': 400, 'body': json.dumps({'error': 'action and config_key are required'})}
    
    logger.info(f"Action: {action} for key: {config_key}")

    try:
        if action == 'propose':
            # 客户端发起的入口,扮演 Proposer
            value = event.get('value')
            if value is None:
                return {'statusCode': 400, 'body': json.dumps({'error': 'value is required for propose action'})}
            return run_proposer_flow(config_key, value)
        
        elif action == 'prepare':
            # 被 Proposer 调用,扮演 Acceptor 的 prepare 阶段
            proposal_id = event['proposal_id']
            return handle_prepare(config_key, proposal_id)
            
        elif action == 'accept':
            # 被 Proposer 调用,扮演 Acceptor 的 accept 阶段
            proposal_id = event['proposal_id']
            value = event['value']
            return handle_accept(config_key, proposal_id, value)
        
        elif action == 'read':
            # 客户端读取最终确认的值
            return read_learned_value(config_key)

        else:
            return {'statusCode': 400, 'body': json.dumps({'error': f'Unknown action: {action}'})}
    except Exception as e:
        logger.error(f"Error executing action {action}: {e}", exc_info=True)
        return {'statusCode': 500, 'body': json.dumps({'error': 'Internal server error'})}

def run_proposer_flow(config_key, value):
    """
    Proposer 的完整流程: Prepare -> Accept
    """
    # 1. 生成一个唯一的、单调递增的提案ID
    # 在真实项目中,这需要一个更鲁棒的机制,例如独立的ID生成服务或基于时间的混合算法
    # 这里为了简化,使用时间戳 + 随机数
    proposal_id = int(time.time() * 1000)

    # --- Prepare 阶段 ---
    logger.info(f"Proposer: Starting Prepare phase with ProposalID: {proposal_id}")
    prepare_payload = {
        'action': 'prepare',
        'config_key': config_key,
        'proposal_id': proposal_id
    }
    
    promises = broadcast_to_acceptors(prepare_payload)
    
    # 检查是否收到了法定人数的 promise
    if len(promises) < QUORUM_SIZE:
        return {'statusCode': 503, 'body': json.dumps({'error': 'Could not get quorum for prepare phase', 'promises_received': len(promises)})}

    logger.info(f"Proposer: Got {len(promises)} promises. Quorum of {QUORUM_SIZE} reached.")

    # 检查 promise 中是否已有被接受的提案
    highest_accepted_id = -1
    value_to_propose = value
    for p in promises:
        if p.get('status') == 'promised_with_value' and p.get('accepted_id', -1) > highest_accepted_id:
            highest_accepted_id = p['accepted_id']
            value_to_propose = p['accepted_value']
            logger.warning(f"Proposer: Found a previously accepted value '{value_to_propose}' with ID {highest_accepted_id}. Proposing this value instead.")

    # --- Accept 阶段 ---
    logger.info(f"Proposer: Starting Accept phase with ProposalID: {proposal_id} and value: {value_to_propose}")
    accept_payload = {
        'action': 'accept',
        'config_key': config_key,
        'proposal_id': proposal_id,
        'value': value_to_propose
    }
    
    acceptances = broadcast_to_acceptors(accept_payload)
    
    if len(acceptances) < QUORUM_SIZE:
        return {'statusCode': 503, 'body': json.dumps({'error': 'Could not get quorum for accept phase', 'acceptances_received': len(acceptances)})}

    # --- Learner 阶段 ---
    # 达成共识,将最终值写入一个特殊记录
    learn_value(config_key, proposal_id, value_to_propose)
    
    logger.info(f"Proposer: Consensus reached for key '{config_key}' with value '{value_to_propose}'.")
    return {'statusCode': 200, 'body': json.dumps({'status': 'consensus_reached', 'key': config_key, 'value': value_to_propose})}


def broadcast_to_acceptors(payload):
    """
    向所有 Acceptor Lambda 广播消息
    """
    responses = []
    # 在生产环境中,应该使用异步调用或 Step Functions 来提高效率
    for arn in ACCEPTOR_ARNS:
        try:
            response = lambda_client.invoke(
                FunctionName=arn,
                InvocationType='RequestResponse', # 同步调用以获取结果
                Payload=json.dumps(payload)
            )
            response_payload = json.loads(response['Payload'].read())
            if response_payload.get('statusCode') == 200:
                 # 只收集成功的响应
                responses.append(json.loads(response_payload['body']))
        except Exception as e:
            logger.error(f"Failed to invoke acceptor {arn}: {e}")
    return responses

# --- Acceptor 逻辑 ---
def handle_prepare(config_key, proposal_id):
    """
    Acceptor 处理 Prepare 请求
    核心在于原子性地检查并更新 'promised' 状态
    """
    try:
        # 查找该 ConfigKey 已承诺的最高提案ID (max_promised_id)
        # 和已接受的最高提案 (max_accepted_id, accepted_value)
        response = table.query(
            KeyConditionExpression=boto3.dynamodb.conditions.Key('ConfigKey').eq(config_key),
            ScanIndexForward=False, # 按 ProposalID 降序查
            Limit=1
        )
        
        last_entry = response['Items'][0] if response['Items'] else None
        
        max_promised_id = last_entry.get('PromiseID', -1) if last_entry else -1
        
        if proposal_id > max_promised_id:
            # 承诺此提案
            # 使用条件更新确保原子性
            table.update_item(
                Key={'ConfigKey': config_key, 'ProposalID': 0}, # 使用一个固定的记录来存储元数据
                UpdateExpression="SET PromiseID = :pid",
                ConditionExpression="attribute_not_exists(PromiseID) OR PromiseID < :pid",
                ExpressionAttributeValues={":pid": proposal_id}
            )
            
            # 检查是否有已接受的值
            max_accepted_item = find_max_accepted(config_key)
            if max_accepted_item:
                return {
                    'statusCode': 200, 
                    'body': json.dumps({
                        'status': 'promised_with_value', 
                        'accepted_id': max_accepted_item['ProposalID'],
                        'accepted_value': max_accepted_item['Value']
                    })
                }
            else:
                 return {'statusCode': 200, 'body': json.dumps({'status': 'promised'})}
        else:
            # 拒绝,因为已经承诺了更高的提案ID
            return {'statusCode': 409, 'body': json.dumps({'status': 'rejected', 'max_promised_id': max_promised_id})}
            
    except ClientError as e:
        if e.response['Error']['Code'] == 'ConditionalCheckFailedException':
            # 并发冲突,另一个请求更新了 PromiseID
            logger.warning(f"Prepare rejected due to concurrent update for key {config_key}")
            return {'statusCode': 409, 'body': json.dumps({'status': 'rejected_concurrently'})}
        raise e

def handle_accept(config_key, proposal_id, value):
    """
    Acceptor 处理 Accept 请求
    """
    try:
        # 检查是否承诺过此 proposal_id (或更高)
        response = table.get_item(Key={'ConfigKey': config_key, 'ProposalID': 0})
        max_promised_id = response.get('Item', {}).get('PromiseID', -1)

        if proposal_id >= max_promised_id:
            # 接受该提案,写入一条新记录
            # 这条写入本身就是原子性的
            table.put_item(
                Item={
                    'ConfigKey': config_key,
                    'ProposalID': proposal_id,
                    'Phase': 'accepted',
                    'Value': value,
                    'Timestamp': int(time.time())
                }
            )
            return {'statusCode': 200, 'body': json.dumps({'status': 'accepted'})}
        else:
            return {'statusCode': 409, 'body': json.dumps({'status': 'rejected', 'max_promised_id': max_promised_id})}

    except ClientError as e:
        logger.error(f"DynamoDB error during accept: {e}")
        raise e
        
def learn_value(config_key, proposal_id, value):
    # 将最终确认的值写入特殊记录
    table.update_item(
        Key={'ConfigKey': config_key, 'ProposalID': 0},
        UpdateExpression="SET LearnedValue = :val, LearnedProposalID = :pid",
        ExpressionAttributeValues={
            ":val": value,
            ":pid": proposal_id
        }
    )

def read_learned_value(config_key):
    response = table.get_item(Key={'ConfigKey': config_key, 'ProposalID': 0})
    item = response.get('Item')
    if item and 'LearnedValue' in item:
        return {'statusCode': 200, 'body': json.dumps({'key': config_key, 'value': item['LearnedValue']})}
    else:
        return {'statusCode': 404, 'body': json.dumps({'error': 'Key not found or no value learned yet'})}

def find_max_accepted(config_key):
    # 查找具有最大 ProposalID 的 "accepted" 记录
    response = table.query(
        KeyConditionExpression=boto3.dynamodb.conditions.Key('ConfigKey').eq(config_key),
        FilterExpression=boto3.dynamodb.conditions.Attr('Phase').eq('accepted'),
        ScanIndexForward=False,
        Limit=1
    )
    return response['Items'][0] if response['Items'] else None

代码中的关键陷阱与考量:

  • 提案ID生成: proposal_id 必须是全局唯一且单调递增的。简单的 time.time() 在高并发下可能冲突。在生产级系统中,需要一个更健壮的ID生成器,例如结合节点ID和时间戳,或者使用一个独立的原子计数器服务。
  • 原子性: DynamoDB 的 ConditionExpression 是实现 Paxos 原子性操作的基石。例如,在 handle_prepare 中,更新 PromiseID 的操作必须是“当且仅当现有的 PromiseID 小于我的 proposal_id 时才成功”,这避免了竞争条件。
  • 通信: broadcast_to_acceptors 中的同步 Lambda 调用 (RequestResponse) 是最简单的实现,但效率低下且容易因单个节点超时而拖慢整个流程。在真实项目中,可以改为异步调用 (Event),然后通过一个聚合器(如 Step Functions 或另一个 Lambda)来收集和判断响应。
  • 活性 (Liveness): 如果 Proposer 在两阶段之间崩溃,或者网络分区导致无法形成法定人数,协议可能会被“卡住”。完整的 Paxos 实现需要更复杂的机制(如超时和 Proposer 抢占)来保证活性,这已超出了本示例的范围。

3. Kong API Gateway 配置

我们使用 Kong 的声明式配置,定义一个 Service 和一个 Route。

# kong.yaml
_format_version: "2.1"
services:
  - name: paxos-config-service
    # URL 指向 AWS API Gateway 为 Lambda 创建的 HTTP 触发器
    # 或者直接使用 Lambda 函数 URL
    url: https://<api-gateway-id>.execute-api.<region>.amazonaws.com/prod/paxos
    protocol: https
    connect_timeout: 15000 # Lambda 冷启动可能较慢
    read_timeout: 15000
    retries: 0 # 共识协议不应简单重试

routes:
  - name: paxos-config-route
    service: paxos-config-service
    paths:
      - /config
    methods:
      - POST
      - GET
    strip_path: true
    
plugins:
  - name: key-auth
    service: paxos-config-service
    config:
      key_names: [apikey]

这个配置创建了一个 /config 端点,所有请求都会被转发到我们的主 Lambda 函数。同时,启用了 key-auth 插件,要求客户端在请求头中提供 apikey 进行认证。

4. GitLab CI/CD 自动化部署

.gitlab-ci.yml 文件将上述所有组件串联起来,实现一键部署。

```yaml

.gitlab-ci.yml

variables:

使用 GitLab CI/CD 变量存储敏感信息

AWS_ACCESS_KEY_ID: $CI_AWS_ACCESS_KEY_ID
AWS_SECRET_ACCESS_KEY: $CI_AWS_SECRET_ACCESS_KEY
AWS_DEFAULT_REGION: “us-east-1”
APP_NAME: “paxos-config-lambda”
DYNAMODB_TABLE_NAME: “paxos_log”

假设有3个 Lambda,它们的名称

LAMBDA_NAMES: “paxos-node-1 paxos-node-2 paxos-node-3”
KONG_ADMIN_URL: $CI_KONG_ADMIN_URL
KONG_ADMIN_TOKEN: $CI_KONG_ADMIN_TOKEN

stages:

  • build
  • test
  • deploy

build-lambda-package:
stage: build
image: python:3.9
script:
- pip install -r requirements.txt -t ./package
- cp -r paxos_lambda/*.py ./package/
- cd package && zip -r ../lambda_package.zip .
artifacts:
paths:
- lambda_package.zip

run-unit-tests:
stage: test
image: python:3.9
script:
- pip install -r requirements.txt
- pip install moto pytest
# 单元测试应使用 moto 库 mock AWS 服务,验证 Paxos 逻辑的正确性
- pytest tests/

deploy-infrastructure:
stage: deploy
image: amazon/aws-cli
script:
# 1. 创建或更新 DynamoDB 表
- |
aws dynamodb describe-table –table-name ${DYNAMODB_TABLE_NAME} > /dev/null 2>&1 ||
aws dynamodb create-table
–table-name ${DYNAMODB_TABLE_NAME}
–attribute-definitions AttributeName=ConfigKey,AttributeType=S AttributeName=ProposalID,AttributeType=N
–key-schema AttributeName=ConfigKey,KeyType=HASH AttributeName=ProposalID,KeyType=RANGE
–billing-mode PAY_PER_REQUEST
- echo “DynamoDB table ensured.”

# 2. 部署 Lambda 函数
- |
  ACCEPTOR_ARNS_LIST=""
  for LAMBDA_NAME in ${LAMBDA_NAMES}; do
    # ... 此处省略创建 IAM Role 的 aws cli 命令 ...
    ROLE_ARN=$(aws iam get-role --role-name ${LAMBDA_NAME}-role --query 'Role.Arn' --output text)
    
    aws lambda get-function --function-name ${LAMBDA_NAME} > /dev/null 2>&1
    if [ $? -eq 0 ]; then
      echo "Updating function ${LAMBDA_NAME}"
      aws lambda update-function-code --function-name ${LAMBDA_NAME} --zip-file fileb://lambda_package.zip
      FUNC_ARN=$(aws lambda get-function-configuration --function-name ${LAMBDA_NAME} --query 'FunctionArn' --output text)
    else
      echo "Creating function ${LAMBDA_NAME}"
      FUNC_ARN=$(aws lambda create-function \
        --function-name ${LAMBDA_NAME} \
        --runtime python3.9 \
        --role ${ROLE_ARN} \
        --handler app.handler \
        --zip-file fileb://lambda_package.zip \
        --timeout 15 \
        --memory-size 128 \
        --query 'FunctionArn' --output text)
    fi
    
    if [ -z "$ACCEPTOR_ARNS_LIST" ]; then
      ACCEPTOR_ARNS_LIST="\"${FUNC_ARN}\""
    else
      ACCEPTOR_ARNS_LIST="${ACCEPTOR_ARNS_LIST}, \"${FUNC_ARN}\""
    fi
  done
  ACCEPTOR_ARNS_JSON="[${ACCEPTOR_ARNS_LIST}]"
  echo "Acceptor ARNs: ${ACCEPTOR_ARNS_JSON}"

# 3. 更新所有 Lambda 的环境变量,让它们知道彼此的存在
- |
  for LAMBDA_NAME in ${LAMBDA_NAMES}; do
    aws lambda update-function-configuration \
      --function-name ${LAMBDA_NAME} \
      --environment "Variables={DYNAMODB_TABLE=${DYNAMODB_TABLE_NAME},ACCEPTOR_ARNS=${ACCEPTOR_ARNS_JSON}}"
  done
  
# 4. 配置 Kong (使用 deck CLI 工具或直接 curl)
- |
  # 此处需要安装 deck: https://docs.konghq.com/deck/
  # sed "s|https://<api-gateway-id>...|${PROPOSER_LAMBDA_URL}|g" kong.yaml > kong.configured.yaml
  # deck sync -s kong.configured.yaml --kong-addr ${KONG_ADMIN_URL} --headers "kong-admin-token:${KONG_ADMIN_TOKEN}"

方案的局限性与未来展望

这个基于 Serverless 的 Paxos 实现成功地验证了在无状态计算单元上构建分布式共识系统的可能性,并提供了一个极具成本吸引力的模型。然而,在投入生产前,必须清醒地认识到其固有的局限性:

  1. 性能与延迟: 整个共识过程涉及多次 Lambda 的冷启动、函数调用和 DynamoDB 的读写。对于单个写操作,其端到端延迟远高于传统的、基于长连接的 ZooKeeper 或 etcd。这决定了它只适用于对延迟不敏感、写操作频率低的场景。

  2. 实现的复杂性: 分布式共识算法的正确实现非常困难。当前的代码简化了许多细节,如活锁(livelock)的避免、更高效的 Leader 选举(Multi-Paxos)、成员变更等。任何一个细节的错误都可能导致系统状态不一致。

  3. 可观测性: 调试一个跨越多个 Lambda 函数和 DynamoDB 的分布式协议是一项挑战。需要详尽的结构化日志,并通过 AWS X-Ray 等工具建立完整的调用链追踪,才能在出现问题时有效定位。

未来的优化路径可以探索使用 AWS Step Functions 来编排整个 Paxos 流程,这将使状态转换和重试逻辑更加清晰和健壮。此外,可以引入一个轻量级的 Leader 选举机制,一旦选出 Leader,后续的写操作就可以绕过两阶段的 Prepare 过程,直接进入 Accept 阶段,从而显著降低延迟,向 Multi-Paxos 靠拢。


  目录