构建基于AWS Serverless与WebSocket的高吞吐量日志流推送服务的架构权衡与成本优化实践


最初为内部平台构建实时日志查看器的方案是HTTP轮询。这个方案在只有少数几个开发人员使用时勉强可用,但随着用户和日志源的增长,它迅速演变成一场灾难。激增的API请求不仅给后端服务带来了不必要的压力,更导致CloudWatch的API调用费用急剧上升。我们需要一个持久化的、基于推送的模型,一个能够从根本上解决问题的架构。

很自然地,我们转向了WebSockets。AWS提供的Serverless方案,特别是API Gateway WebSocket APIs与AWS Lambda的组合,看起来是理想的起点:无服务器、按需付费、自动扩缩容。初步的构想非常直接:客户端通过API Gateway连接,触发一个$connect Lambda函数将连接ID存入DynamoDB。当有新日志产生时,一个广播服务会从DynamoDB中捞出所有连接ID,然后遍历调用API Gateway的postToConnection接口进行推送。

graph TD
    subgraph "初始方案"
        Client[客户端] -- WebSocket Connect --> APIGW[API Gateway WebSocket]
        APIGW -- "$connect" --> ConnectLambda[Lambda: onConnect]
        ConnectLambda -- "写入 ConnectionID" --> DynamoDB[(Connection Table)]
        
        LogSource[日志源] -- "产生日志" --> BroadcastLambda[Lambda: Broadcast]
        BroadcastLambda -- "1. Scan全表" --> DynamoDB
        BroadcastLambda -- "2. 循环推送" --> APIGW
        APIGW -- "推送日志" --> Client
    end

这个设计在概念验证阶段(10个并发连接)表现完美。然而,当我们将其投入到预生产环境,模拟1000个并发连接并接入中等流量的日志源时,问题立刻暴露无遗。广播Lambda的执行时间从毫秒级飙升到数秒,CloudWatch监控面板上,该函数的错误率和超时次数开始报警。更严重的是,成本估算显示,这种全表扫描(Scan)并逐一推送的模式,其费用将随着连接数的增加呈线性乃至指数级增长。每一次日志广播,我们都在为一次低效的DynamoDB全表扫描和成百上千次独立的API调用付费。这个架构无法扩展,且成本模型是不可持续的。

架构重构:从低效广播到精准扇出

问题的核心在于广播机制。在真实项目中,用户通常只关心特定服务或环境的日志。一个前端开发者可能只订阅webapp-prod的日志,而一个数据库管理员则关心rds-aurora-cluster-1的日志。向所有连接的客户端推送全部日志流,不仅是技术上的浪费,也是业务上的错误。

解决方案是引入“主题”(Topic)或“频道”(Channel)的概念,实现一个发布/订阅(Pub/Sub)模型。客户端在连接后,不再是被动地等待广播,而是主动发送一个subscribe消息,指明其感兴趣的日志主题。

新的架构设计如下:

  1. 连接管理: 客户端连接时,依然通过$connect Lambda记录其connectionId,但此时的DynamoDB表结构需要重新设计,以支持按主题高效查询。
  2. 订阅处理: 我们定义一个自定义路由subscribe。当客户端发送形如{ "action": "subscribe", "topic": "webapp-prod" }的消息时,API Gateway会将其路由到一个新的subscribeHandler Lambda。该函数负责将connectionIdtopic的订阅关系持久化到DynamoDB。
  3. 消息总线: 引入Amazon SNS(或EventBridge)作为解耦的消息总线。日志源不再直接触发广播Lambda,而是将带有主题信息的日志消息发布到SNS Topic。
  4. 扇出推送: 一个专用的fanoutBroadcastHandler Lambda订阅该SNS Topic。当新消息到达时,它被触发,从消息体中解析出topic,然后高效地从DynamoDB中查询出所有订阅了该topicconnectionId列表,并只向这些连接推送消息。
graph TD
    subgraph "优化后方案"
        Client[客户端] -- WebSocket Connect --> APIGW[API Gateway WebSocket]
        APIGW -- "$connect" --> ConnectLambda[Lambda: onConnect]
        ConnectLambda -- "写入Connection" --> ConnectionsTable[(Connections Table)]

        Client -- "{action:'subscribe', topic:'X'}" --> APIGW
        APIGW -- "subscribe" --> SubscribeLambda[Lambda: onSubscribe]
        SubscribeLambda -- "写入订阅关系" --> SubscriptionsTable[(Subscriptions Table)]
        
        LogSource[日志源] -- "发布日志到Topic X" --> SNSTopic[Amazon SNS Topic]
        SNSTopic -- "触发" --> FanoutLambda[Lambda: Fanout Broadcast]
        FanoutLambda -- "1. 查询Topic X的订阅者" --> SubscriptionsTable
        FanoutLambda -- "2. 精准推送" --> APIGW
        APIGW -- "推送日志" --> Client
    end

这种架构将一次昂贵的、O(N)复杂度的Scan操作,转变为一次高效的、O(logK)复杂度的Query操作(N是总连接数,K是单个主题的订阅数)。成本和性能的瓶颈被彻底打破。

核心实现:代码与基建

我们将使用TypeScript编写Lambda函数,并借助esbuild进行打包,以获得极快的构建速度和极小的代码包体积,这对于降低Lambda冷启动时间至关重要。基础设施则通过AWS CDK进行声明式管理。

1. DynamoDB 表设计

我们需要两张表。第一张是Connections表,仅用于跟踪活跃连接,便于在$disconnect时清理订阅。

  • Connections:
    • 主键 (PK): connectionId (String)

第二张是核心的Subscriptions表,用于存储订阅关系。这里的关键是使用全局二级索引(GSI)来支持按topic查询。

  • Subscriptions:
    • 主键 (PK): connectionId (String)
    • 排序键 (SK): topic (String)
    • GSI: topic-index
      • 分区键 (PK): topic (String)
      • 排序键 (SK): connectionId (String)

这个GSI (topic-index) 允许我们用topic作为查询条件,高效地获取所有订阅了该主题的connectionId

2. esbuild 构建配置

在一个真实项目中,我们会有多个Lambda函数的入口文件。esbuild可以轻松处理这种情况。

build.mjs

import * as esbuild from 'esbuild';
import { glob } from 'glob';

// 找到所有 handler 入口文件
const entryPoints = await glob('./src/handlers/*.ts');

const sharedConfig = {
  entryPoints,
  bundle: true,
  platform: 'node',
  target: 'node18',
  outdir: 'dist',
  sourcemap: true,
  minify: process.env.NODE_ENV === 'production',
  external: ['@aws-sdk/*'], // AWS SDK v3 is available in the Lambda runtime
  logLevel: 'info',
};

esbuild.build(sharedConfig).catch(() => process.exit(1));

这个配置会将src/handlers/目录下的每个TypeScript文件都打包成一个独立的JavaScript文件,输出到dist/目录。external配置避免了将AWS SDK打包进去,显著减小了体积。

3. Lambda 处理器核心代码 (TypeScript)

我们将关键逻辑封装在服务层,保持handler的整洁。

src/services/apiGatewayManagementApi.ts

import {
  ApiGatewayManagementApiClient,
  PostToConnectionCommand,
  DeleteConnectionCommand,
} from '@aws-sdk/client-apigatewaymanagementapi';
import { logger } from './logger'; // 假设有一个通用的 logger

const apiGwManagmentApi = new ApiGatewayManagementApiClient({
  apiVersion: '2018-11-29',
  endpoint: process.env.API_GATEWAY_ENDPOINT,
});

/**
 * 向单个连接发送消息。
 * 如果连接已失效 (GoneException),则返回 true,表示需要清理。
 * @param connectionId 连接ID
 * @param data 要发送的数据
 * @returns {Promise<boolean>} 是否需要清理此连接
 */
export async function postToConnection(connectionId: string, data: any): Promise<boolean> {
  try {
    const command = new PostToConnectionCommand({
      ConnectionId: connectionId,
      Data: JSON.stringify(data),
    });
    await apiGwManagmentApi.send(command);
    return false;
  } catch (error: any) {
    // 410 GoneException 表示客户端已经断开连接
    if (error.statusCode === 410) {
      logger.warn(`Stale connection found: ${connectionId}. Marking for cleanup.`);
      return true;
    }
    logger.error(`Failed to post to connection ${connectionId}:`, error);
    // 对于其他错误,我们暂时不清理,可能是临时性问题
    return false;
  }
}

// ... 其他辅助函数,例如断开连接

src/handlers/subscribeHandler.ts

import { APIGatewayProxyHandler } from 'aws-lambda';
import { DynamoDBClient } from '@aws-sdk/client-dynamodb';
import { DynamoDBDocumentClient, PutCommand } from '@aws-sdk/lib-dynamodb';
import { logger } from '../services/logger';

const ddbClient = new DynamoDBClient({});
const docClient = DynamoDBDocumentClient.from(ddbClient);
const subscriptionsTable = process.env.SUBSCRIPTIONS_TABLE_NAME!;

export const handler: APIGatewayProxyHandler = async (event) => {
  const connectionId = event.requestContext.connectionId!;
  
  let body;
  try {
    body = JSON.parse(event.body || '{}');
  } catch (e) {
    logger.error('Invalid JSON body');
    return { statusCode: 400, body: 'Invalid JSON body' };
  }

  const { topic } = body;
  if (!topic || typeof topic !== 'string') {
    return { statusCode: 400, body: 'Missing or invalid "topic" in request body' };
  }

  const putParams = {
    TableName: subscriptionsTable,
    Item: {
      connectionId: connectionId,
      topic: topic,
      createdAt: new Date().toISOString(),
    },
  };

  try {
    await docClient.send(new PutCommand(putParams));
    logger.info(`Connection ${connectionId} subscribed to topic ${topic}`);
    return { statusCode: 200, body: 'Subscribed.' };
  } catch (error) {
    logger.error(`Failed to subscribe ${connectionId} to ${topic}`, error);
    return { statusCode: 500, body: 'Subscription failed.' };
  }
};

src/handlers/fanoutBroadcastHandler.ts

import { SNSEvent, SNSHandler } from 'aws-lambda';
import { DynamoDBClient } from '@aws-sdk/client-dynamodb';
import { DynamoDBDocumentClient, QueryCommand, BatchWriteCommand } from '@aws-sdk/lib-dynamodb';
import { logger } from '../services/logger';
import { postToConnection } from '../services/apiGatewayManagementApi';

const ddbClient = new DynamoDBClient({});
const docClient = DynamoDBDocumentClient.from(ddbClient);
const subscriptionsTable = process.env.SUBSCRIPTIONS_TABLE_NAME!;

export const handler: SNSHandler = async (event: SNSEvent) => {
  for (const record of event.Records) {
    const snsMessage = record.Sns;
    
    // 解析日志消息和主题
    let logData;
    try {
      logData = JSON.parse(snsMessage.Message);
    } catch (e) {
      logger.error('Failed to parse SNS message', { message: snsMessage.Message });
      continue; // 跳过格式错误的消息
    }
    
    const { topic, payload } = logData;
    if (!topic) {
      logger.warn('SNS message missing topic, skipping broadcast.', logData);
      continue;
    }

    // 1. 使用 GSI 高效查询订阅者
    const queryParams = {
      TableName: subscriptionsTable,
      IndexName: 'topic-index', // 关键!使用GSI
      KeyConditionExpression: 'topic = :t',
      ExpressionAttributeValues: {
        ':t': topic,
      },
    };

    try {
      const queryResult = await docClient.send(new QueryCommand(queryParams));
      const connections = queryResult.Items || [];
      if (connections.length === 0) {
        logger.info(`No subscribers for topic: ${topic}.`);
        continue;
      }

      logger.info(`Broadcasting to ${connections.length} subscribers for topic: ${topic}`);

      // 2. 并行推送并收集需要清理的失效连接
      const postPromises = connections.map(async (item) => {
        const needsCleanup = await postToConnection(item.connectionId, payload);
        return needsCleanup ? item.connectionId : null;
      });

      const results = await Promise.all(postPromises);
      const staleConnectionIds = results.filter((id): id is string => id !== null);

      // 3. 异步清理失效的订阅关系
      if (staleConnectionIds.length > 0) {
        await cleanupStaleSubscriptions(staleConnectionIds, topic);
      }
    } catch (error) {
      logger.error(`Error during broadcast for topic ${topic}`, error);
    }
  }
};

/**
 * 批量从订阅表中删除失效的连接
 */
async function cleanupStaleSubscriptions(connectionIds: string[], topic: string) {
  logger.info(`Cleaning up ${connectionIds.length} stale subscriptions for topic ${topic}.`);
  
  // DynamoDB BatchWriteItem 每次最多25个请求
  const batchSize = 25;
  for (let i = 0; i < connectionIds.length; i += batchSize) {
    const batch = connectionIds.slice(i, i + batchSize);
    const deleteRequests = batch.map(id => ({
      DeleteRequest: {
        Key: {
          connectionId: id,
          topic: topic,
        },
      },
    }));

    const batchWriteParams = {
      RequestItems: {
        [subscriptionsTable]: deleteRequests,
      },
    };

    try {
      await docClient.send(new BatchWriteCommand(batchWriteParams));
    } catch (error) {
      logger.error('Failed to batch delete stale subscriptions', error);
    }
  }
}

这段fanoutBroadcastHandler代码是整个系统的核心。它演示了:

  • 高效查询: 使用topic-index GSI进行Query,避免了全表扫描。
  • 并发推送: Promise.all并行地向所有订阅者推送消息,最大化吞吐量。
  • 失效连接处理: postToConnection返回一个标志位,指示连接是否已失效。主逻辑收集所有失效连接ID,并调用一个独立的清理函数。
  • 批量清理: cleanupStaleSubscriptions使用BatchWriteCommand高效地批量删除数据库中的陈旧记录,这比单个DeleteItem调用要经济得多。

4. AWS CDK 基础设施定义

// lib/websocket-stack.ts
import * as cdk from 'aws-cdk-lib';
import { Construct } from 'constructs';
import * as lambda from 'aws-cdk-lib/aws-lambda';
import * as apigw2 from 'aws-cdk-lib/aws-apigatewayv2';
import { WebSocketLambdaIntegration } from 'aws-cdk-lib/aws-apigatewayv2-integrations';
import * as dynamodb from 'aws-cdk-lib/aws-dynamodb';
import * as sns from 'aws-cdk-lib/aws-sns';
import * as subs from 'aws-cdk-lib/aws-sns-subscriptions';
import { NodejsFunction } from 'aws-cdk-lib/aws-lambda-nodejs';

export class WebsocketLogStreamStack extends cdk.Stack {
  constructor(scope: Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    // DynamoDB Subscriptions Table with GSI
    const subscriptionsTable = new dynamodb.Table(this, 'SubscriptionsTable', {
      partitionKey: { name: 'connectionId', type: dynamodb.AttributeType.STRING },
      sortKey: { name: 'topic', type: dynamodb.AttributeType.STRING },
      billingMode: dynamodb.BillingMode.PAY_PER_REQUEST,
      removalPolicy: cdk.RemovalPolicy.DESTROY,
    });
    subscriptionsTable.addGlobalSecondaryIndex({
      indexName: 'topic-index',
      partitionKey: { name: 'topic', type: dynamodb.AttributeType.STRING },
      sortKey: { name: 'connectionId', type: dynamodb.AttributeType.STRING },
    });

    // Lambda Function common properties
    const lambdaProps = {
      runtime: lambda.Runtime.NODEJS_18_X,
      bundling: {
        // 使用 esbuild, 与 build.mjs 的配置类似
        minify: true,
        sourceMap: true,
        externalModules: ['@aws-sdk/*'],
      },
      environment: {
        SUBSCRIPTIONS_TABLE_NAME: subscriptionsTable.tableName,
      },
    };

    // WebSocket API
    const webSocketApi = new apigw2.WebSocketApi(this, 'LogStreamApi', {
      connectRouteOptions: {
        integration: new WebSocketLambdaIntegration('ConnectIntegration', 
          new NodejsFunction(this, 'ConnectHandler', {
            ...lambdaProps,
            entry: 'src/handlers/connectHandler.ts',
          })
        ),
      },
      disconnectRouteOptions: { /* ... */ },
    });

    // Subscribe route
    const subscribeHandler = new NodejsFunction(this, 'SubscribeHandler', {
      ...lambdaProps,
      entry: 'src/handlers/subscribeHandler.ts',
    });
    subscriptionsTable.grantWriteData(subscribeHandler);
    webSocketApi.addRoute('subscribe', {
      integration: new WebSocketLambdaIntegration('SubscribeIntegration', subscribeHandler),
    });

    // SNS Topic for log messages
    const logTopic = new sns.Topic(this, 'LogTopic');

    // Fanout Broadcast Lambda
    const fanoutHandler = new NodejsFunction(this, 'FanoutHandler', {
      ...lambdaProps,
      entry: 'src/handlers/fanoutBroadcastHandler.ts',
      environment: {
        ...lambdaProps.environment,
        API_GATEWAY_ENDPOINT: `https://` + `${webSocketApi.apiId}.execute-api.${this.region}.amazonaws.com/${webSocketApi.apiStage.stageName}`
      }
    });
    logTopic.addSubscription(new subs.LambdaSubscription(fanoutHandler));
    subscriptionsTable.grantReadData(fanoutHandler.role!); // 允许从GSI读取
    subscriptionsTable.grantWriteData(fanoutHandler); // 允许删除陈旧订阅
    webSocketApi.grantManageConnections(fanoutHandler); // 授予 postToConnection 权限

  }
}

局限性与未来迭代路径

这套基于SNS扇出的Serverless WebSocket架构,在成本效益和可扩展性上远超最初的“全量广播”模型。它非常适合需要按频道分发实时数据的场景,例如日志聚合、实时仪表盘、通知中心等。

然而,它并非没有边界。首先,API Gateway对WebSocket消息体有32KB的硬性限制,这使得它不适用于需要传输大型数据块的场景。其次,从日志源发布到SNS,再由Lambda消费并推送到客户端,这个过程引入了额外的延迟(通常在几十到几百毫秒之间),对于需要纳秒级或极低毫秒级延迟的金融高频交易等应用是不合适的。

对于需要管理数百万级并发长连接的超大规模应用,API Gateway的按连接时长计费模型可能会变得昂贵。在这种极端情况下,一个部署在EC2或ECS/EKS上的、基于高性能框架(如Rust的actix-web或Go的gorilla/websocket)构建的、自行管理连接状态的常驻服务,尽管运维复杂性更高,但其总拥有成本可能会更低。

未来的一个优化方向是在服务端实现更精细的过滤逻辑。例如,允许客户端在订阅时提供一个正则表达式,fanoutBroadcastHandler在推送前对日志内容进行匹配,只发送符合条件的消息。这将进一步减少网络流量和客户端的处理负担,但这会增加Lambda的计算时间和DynamoDB中订阅模型的复杂度。


  目录