在 Google Cloud 上构建基于 Kafka 与 CQRS 的异地多活读写分离架构


业务需求被明确定义为构建一个全球分布式的实时状态同步系统。用户在美洲、欧洲、亚洲的写操作,必须在秒级延迟内对全球其他用户可见,同时,用户对数据的读取延迟必须严格控制在50毫秒以下。这是一个典型的异地多活(Multi-Active)技术挑战,它直接将网络延迟、数据一致性与系统可用性这三个分布式系统中的核心矛盾摆在了桌面上。

定义问题:全球一致性与本地延迟的权衡

要实现这个目标,我们面临几个关键的技术决策点:

  1. 写路径(Command Path): 用户发起的写操作(命令)如何被处理?如何保证命令的持久化和最终的全局一致性?跨区域的网络延迟是物理定律,无法消除,一个在美国完成的写操作,数据传输到亚洲必然存在几十到几百毫秒的延迟。
  2. 读路径(Query Path): 用户读取数据时,如何确保最低的延迟?显然,数据必须从离用户最近的区域提供。
  3. 数据复制: 跨区域的数据复制机制是什么?如何保证其可靠性、顺序性,并提供可观测性来监控复制延迟?
  4. 容错性: 单个区域的计算或存储设施发生故障时,系统必须保持可用,其他区域的用户不应受到影响。

方案A:依赖全球分布式数据库

第一个进入考虑范围的方案是利用类似 Google Cloud Spanner 这样的全球分布式数据库。

  • 优势:

    • 提供外部一致性(强一致性),应用层逻辑可以被极大地简化。ACID 事务在全球范围内得到保证。
    • Google 负责底层数据复制、多区域容错和时钟同步(TrueTime),运维负担相对较轻。
  • 劣势:

    • 写延迟: 强一致性的代价是写延迟。每一次写操作都需要在全球多个副本之间通过 Paxos 或 Raft 协议达成共识,其延迟必然受到跨大洲光纤传输速度的限制,通常在100-200毫秒之间,这对于某些需要快速写确认的场景是无法接受的。
    • 成本: 此类服务的成本非常高昂,尤其是在高吞吐量的情况下。
    • 读模型灵活性: 所有的读写都耦合在同一套存储模型上。如果未来需要针对不同查询场景优化出多种物化视图(Materialized Views),在 Spanner 这样的行存数据库上直接操作会变得复杂且低效。

在真实项目中,成本和写延迟是硬性约束。方案A因其固有的物理延迟和高昂的财务成本,被认为是不适合我们这个特定业务场景的。

方案B:基于事件驱动的 CQRS 异步复制架构

我们转而设计一种更复杂的、但更具弹性和成本效益的架构。其核心思想是彻底分离命令(写)和查询(读)的职责,即 CQRS(Command Query Responsibility Segregation)模式,并利用消息队列在区域间异步复制状态变更事件。

  • 优势:

    • 极低的本地延迟: 写操作被发送到用户所在区域的命令处理服务,该服务只需将代表状态变更的事件写入本地的 Kafka 集群即可立即向用户返回成功。这个过程通常在10毫秒内完成。读操作则直接访问本区域内经过优化的读模型副本,延迟极低。
    • 高可用性: 各区域在写操作上是独立的。一个区域的故障不会影响其他区域接受新的写命令。
    • 读写模型解耦: 我们可以为读模型选择最合适的存储技术(例如文档数据库、内存缓存、搜索引擎),独立于写模型进行扩展和优化。
    • 成本可控: 使用 GKE (Google Kubernetes Engine) 部署服务,配合自建或云厂商提供的 Kafka 服务,整体成本远低于全球分布式数据库。
  • 劣势:

    • 最终一致性: 最大的挑战在于系统是最终一致的。一个在欧洲发生的写操作,需要经过一段时间的复制延迟,才能在亚洲的读模型中体现出来。应用层,尤其是前端,必须被设计成能够处理这种数据延迟。
    • 架构复杂性: 引入了 Kafka、跨区域复制、命令服务、查询服务等多个组件,开发和运维的复杂度显著增加。
    • 冲突解决: 如果两个区域的用户几乎同时修改同一个数据实体,可能会产生写冲突,需要在命令处理层设计明确的冲突解决策略(如 Last-Write-Wins)。

经过权衡,我们决定采纳方案B。业务可以容忍秒级的全局数据可见性延迟,但无法妥协于本地的读写性能。架构的复杂性被认为是可以通过标准化的基础设施和流程来管理的。

核心实现概览

我们选择在 GCP 的三个区域(us-central1, europe-west1, asia-east1)部署我们的系统。每个区域都包含一套完整的服务栈。

graph TD
    subgraph "Region: us-central1"
        US_USER[User in US] --> US_ISTIO{Istio Ingress Gateway};
        US_ISTIO --> US_CMD_SVC[Command Service];
        US_CMD_SVC -- Event --> US_KAFKA[Kafka Cluster];
        US_RELAY[Relay Client] --> US_ISTIO;
        US_ISTIO --> US_QUERY_SVC[Query Service];
        US_QUERY_SVC --> US_READ_DB[(Read DB)];
        US_KAFKA_CONSUMER[Projection Consumer] -- Consumes --> US_KAFKA;
        US_KAFKA_CONSUMER -- Updates --> US_READ_DB;
    end

    subgraph "Region: europe-west1"
        EU_USER[User in EU] --> EU_ISTIO{Istio Ingress Gateway};
        EU_ISTIO --> EU_CMD_SVC[Command Service];
        EU_CMD_SVC -- Event --> EU_KAFKA[Kafka Cluster];
        EU_RELAY[Relay Client] --> EU_ISTIO;
        EU_ISTIO --> EU_QUERY_SVC[Query Service];
        EU_QUERY_SVC --> EU_READ_DB[(Read DB)];
        EU_KAFKA_CONSUMER[Projection Consumer] -- Consumes --> EU_KAFKA;
        EU_KAFKA_CONSUMER -- Updates --> EU_READ_DB;
    end

    US_KAFKA -- Replicated by MirrorMaker2 --> EU_KAFKA;
    EU_KAFKA -- Replicated by MirrorMaker2 --> US_KAFKA;

    %% Connect consumers to remote topics
    US_KAFKA_CONSUMER -- Consumes from replicated topic --> US_KAFKA;
    EU_KAFKA_CONSUMER -- Consumes from replicated topic --> EU_KAFKA;

    linkStyle 8 stroke:#ff7f0e,stroke-width:2px,stroke-dasharray: 5 5;
    linkStyle 9 stroke:#ff7f0e,stroke-width:2px,stroke-dasharray: 5 5;

命令侧实现:Go 服务与 Kafka 生产者

命令服务是一个简单的 gRPC 服务,负责接收、验证命令,并将其转化为一个或多个领域事件(Domain Events)发布到 Kafka。这里的关键是服务的无状态性和幂等性设计。

cmd/service/main.go

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"net"
	"os"
	"time"

	"github.com/google/uuid"
	"github.com/segmentio/kafka-go"
	"google.golang.org/grpc"

	pb "path/to/your/protobuf"
)

const (
	// 在真实项目中,这些配置应该来自环境变量或配置文件
	kafkaBrokers = "kafka-broker-1:9092,kafka-broker-2:9092"
	eventsTopic  = "product.events"
	gcpRegion    = "us-central1" // 服务部署的区域
)

// 定义事件结构
type ProductStockUpdatedEvent struct {
	EventID   string    `json:"eventId"`
	ProductID string    `json:"productId"`
	ChangeQty int32     `json:"changeQty"`
	Region    string    `json:"region"`
	Timestamp time.Time `json:"timestamp"`
}

type server struct {
	pb.UnimplementedInventoryCommandServer
	kafkaWriter *kafka.Writer
}

// newKafkaWriter 创建一个健壮的 Kafka 生产者
func newKafkaWriter(brokers []string, topic string) *kafka.Writer {
	// 这里的配置对于生产环境至关重要
	return &kafka.Writer{
		Addr: kafka.TCP(brokers...),
		Topic: topic,
		// RequiredAcks: kafka.RequireAll 保证了消息被所有 ISR (In-Sync Replicas) 确认
		// 这是数据持久性的最高保证,但会增加延迟。
		RequiredAcks: kafka.RequireAll,
		// Balancer: &kafka.LeastBytes{} 会将消息发送到分区负载最低的 broker
		Balancer: &kafka.LeastBytes{},
		// 异步写入可以提高吞吐量,但在服务关闭时需要确保 flush
		Async: false,
		// 设置写入超时,防止无限期阻塞
		WriteTimeout: 10 * time.Second,
	}
}

func (s *server) UpdateStock(ctx context.Context, req *pb.UpdateStockRequest) (*pb.UpdateStockResponse, error) {
	log.Printf("Received UpdateStock command for product %s, change: %d", req.ProductId, req.ChangeQty)

	// 1. 输入验证
	if req.ProductId == "" || req.ChangeQty == 0 {
		return nil, fmt.Errorf("invalid request: ProductId and ChangeQty must be provided")
	}

	// 2. 创建领域事件
	event := ProductStockUpdatedEvent{
		EventID:   uuid.New().String(),
		ProductID: req.ProductId,
		ChangeQty: req.ChangeQty,
		Region:    gcpRegion,
		Timestamp: time.Now().UTC(),
	}

	eventBytes, err := json.Marshal(event)
	if err != nil {
		log.Printf("ERROR: Failed to marshal event: %v", err)
		return nil, fmt.Errorf("internal error while processing event")
	}

	// 3. 将事件发布到 Kafka
	// 使用 ProductId作为 Key,确保同一产品的所有事件都进入同一个分区,从而保证处理顺序
	err = s.kafkaWriter.WriteMessages(ctx, kafka.Message{
		Key:   []byte(req.ProductId),
		Value: eventBytes,
	})

	if err != nil {
		log.Printf("ERROR: Failed to write message to Kafka: %v", err)
		// 这里的错误处理很关键,可能需要重试机制或记录到死信队列
		return nil, fmt.Errorf("failed to persist state change")
	}

	log.Printf("Event %s for product %s published successfully", event.EventID, event.ProductID)

	// 4. 立即返回成功确认
	return &pb.UpdateStockResponse{
		Success: true,
		EventId: event.EventID,
	}, nil
}

func main() {
	lis, err := net.Listen("tcp", ":50051")
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}

	kafkaWriter := newKafkaWriter(os.Getenv("KAFKA_BROKERS"), eventsTopic)
	defer kafkaWriter.Close()

	s := grpc.NewServer()
	pb.RegisterInventoryCommandServer(s, &server{kafkaWriter: kafkaWriter})

	log.Printf("Command service listening at %v", lis.Addr())
	if err := s.Serve(lis); err != nil {
		log.Fatalf("failed to serve: %v", err)
	}
}

单元测试的思路是:使用 mock 接口替代 kafka.Writer,验证服务在接收到 gRPC 请求后,是否正确地构造了事件,并调用了 WriteMessages 方法。

数据复制:配置 Kafka MirrorMaker 2

Kafka MirrorMaker 2 (MM2) 是 Kafka 官方提供的跨集群复制工具。我们在每个区域的 GKE 集群中都部署一个 MM2 实例,负责将其他区域的事件主题复制到本地。

一个简化的 MM2 配置文件 mm2.properties:

# 每个 MM2 实例连接所有集群,但只主动复制远程集群的数据
clusters = us, eu, asia

# 定义每个集群的连接信息
us.bootstrap.servers = kafka-us.domain.local:9092
eu.bootstrap.servers = kafka-eu.domain.local:9092
asia.bootstrap.servers = kafka-asia.domain.local:9092

# 启用从欧洲和亚洲到美国的复制
eu->us.enabled = true
asia->us.enabled = true
# 指定要复制的主题
eu->us.topics = "product.events"
asia->us.topics = "product.events"
# MM2 会自动重命名主题,例如 'eu.product.events',以避免命名冲突
# replication.policy.class 可以自定义重命名策略

# 确保复制任务的配置
replication.factor=3
sync.topic.configs.enabled = true
refresh.topics.interval.seconds = 60

这里的坑在于:必须严密监控 MM2 的 replication-latency-ms 指标,一旦延迟过高,就需要立即告警并介入调查,否则会导致区域间数据差异过大。

查询侧实现:构建只读物化视图

查询服务订阅本地 Kafka 集群中的所有相关主题(包括从其他区域复制过来的),然后根据事件内容更新一个为读取优化的本地数据库(例如 PostgreSQL 或 Firestore)。这个过程称为“投影”(Projection)。

query/service/consumer.ts

import { Kafka, Consumer, EachMessagePayload } from 'kafkajs';
import { PrismaClient } from '@prisma/client'; // 使用 Prisma 作为 ORM 示例

// 在真实项目中,这些配置来自环境变量
const KAFKA_BROKERS = process.env.KAFKA_BROKERS!.split(',');
const GCP_REGION = process.env.GCP_REGION!;

const prisma = new PrismaClient();

// 我们需要订阅原始主题和被MM2复制过来的主题
// 例如,在美国区域,我们要订阅 us.product.events, eu.product.events, asia.product.events
// 为了简化,这里假设 MM2 配置了统一的主题名策略
const TOPICS = ['product.events', 'eu.product.events', 'asia.product.events'];

const kafka = new Kafka({
  clientId: `query-service-${GCP_REGION}`,
  brokers: KAFKA_BROKERS,
});

const consumer: Consumer = kafka.consumer({ groupId: `product-projection-group-${GCP_REGION}` });

// 事件处理器,核心业务逻辑
async function handleProductStockUpdated(payload: any): Promise<void> {
  const { productId, changeQty } = payload;
  
  if (!productId || typeof changeQty !== 'number') {
    console.error('Invalid message received:', payload);
    return;
  }
  
  // prisma.product.upsert 是一个原子操作,可以保证幂等性
  // 如果产品不存在,则创建;如果存在,则更新库存
  // 这里的关键是使用 `increment`,而不是 set,来处理事件
  await prisma.product.upsert({
    where: { id: productId },
    update: {
      stock: {
        increment: changeQty,
      },
    },
    create: {
      id: productId,
      stock: changeQty,
    },
  });
  console.log(`Projected stock update for product ${productId}, change: ${changeQty}`);
}

async function run(): Promise<void> {
  await consumer.connect();
  await consumer.subscribe({ topics: TOPICS, fromBeginning: true });

  await consumer.run({
    eachMessage: async ({ topic, partition, message }: EachMessagePayload) => {
      if (!message.value) return;

      try {
        const payload = JSON.parse(message.value.toString());
        // 根据事件类型或其他元数据进行路由
        // 在一个复杂的系统中,这里会有一个事件路由器
        await handleProductStockUpdated(payload);
      } catch (error) {
        console.error('Error processing message:', error);
        // 错误处理:对于无法解析或处理的消息,应推送到死信队列
      }
    },
  });
}

run().catch(e => console.error('Consumer error:', e));

// 优雅关闭
process.on('SIGTERM', async () => {
    await consumer.disconnect();
    await prisma.$disconnect();
    process.exit(0);
});

服务网格:实现区域感知的智能路由

单纯部署服务还不够,我们必须确保用户的请求被路由到离他们最近的区域。这里服务网格(Service Mesh),如 Istio,扮演了至关重要的角色。

我们配置 Istio Ingress Gateway 和 VirtualService 来处理外部流量。更关键的是,我们利用 Istio 的 DestinationRulelocalityLbSetting 来实现区域内优先路由。

istio-config.yaml

apiVersion: networking.istio.io/v1alpha3
kind: Gateway
metadata:
  name: global-ingress-gateway
spec:
  selector:
    istio: ingressgateway
  servers:
  - port:
      number: 80
      name: http
      protocol: HTTP
    hosts:
    - "*.your-global-domain.com"
---
apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
  name: api-virtual-service
spec:
  hosts:
  - "api.your-global-domain.com"
  gateways:
  - global-ingress-gateway
  http:
  - route:
    - destination:
        host: command-service.prod.svc.cluster.local
        port:
          number: 50051
      weight: 100
---
apiVersion: networking.istio.io/v1alpha3
kind: DestinationRule
metadata:
  name: command-service-dr
spec:
  host: command-service.prod.svc.cluster.local
  trafficPolicy:
    loadBalancer:
      simple: ROUND_ROBIN
      # 这是核心配置:启用区域感知路由
      localityLbSetting:
        enabled: true
        # 如果本区域没有健康的实例,将50%流量分配到下一个优先区域,剩下50%再到下一个
        # 具体的百分比需要根据业务容灾需求精细调整
        distribute:
        - from: "us-central1/*"
          to:
            "europe-west1/*": 50
            "asia-east1/*": 50

配合 Google Cloud 的全球负载均衡器(Global Load Balancer),用户的流量会被首先导向最近的 GCP 边缘节点,然后进入最近区域的 Istio Gateway。Istio 的 localityLbSetting 确保了在服务级别,流量会优先在区域内部署的 Pod 之间流转,只有当本区域实例全部不可用时,才会将流量故障转移(failover)到其他区域。

前端集成:Relay 与最终一致性的挑战

现在,后端架构已经能够处理全球分布的读写请求了,但前端该如何消费这些数据?使用像 Relay 这样的 GraphQL 客户端,它期望一个统一的、一致的数据图(Graph)。但在我们的架构中,数据是最终一致的。

一个常见的错误是让 Relay 直接通过一个全局的 GraphQL 端点执行 mutation,然后立即重新查询数据。这会导致问题:mutation 在美国执行,但 Relay 客户端可能连接到亚洲的 GraphQL 网关进行查询,此时数据尚未复制过来,用户界面会显示旧数据。

我们的解决方案是调整前端交互模式:

  1. 分离 Mutation 和 Query 路径:

    • Relay 的 Query 和 Subscription 继续通过 GraphQL 端点进行,该端点由本地的 Query Service 提供。
    • Mutation 不通过 GraphQL。而是通过一个独立的、轻量级的客户端直接调用对应区域的 gRPC Command Service
  2. 拥抱事件驱动的 UI 更新:

    • 当用户执行一个写操作时,独立的客户端发送命令到后端。后端立刻返回一个操作成功的确认,可能包含一个事件ID。
    • 前端UI可以做一个乐观更新(Optimistic Update),立即在界面上反映出预期的变化。
    • 同时,前端通过 GraphQL Subscription 订阅相关数据的变更。当数据变更事件通过 Kafka 复制到本地,并被 Query Service 处理后,Query Service 会通过 WebSocket 推送更新。
    • Relay 客户端接收到这个推送,用来自服务器的权威数据更新其本地存储,覆盖之前的乐观更新。这确保了UI最终与后端状态一致。

这个模式将后端的最终一致性模型透明地反映到了前端,使得UI的行为变得可预测且健壮。

架构的局限性与未来迭代

此架构虽然解决了核心问题,但其复杂性也带来了新的挑战。首先,写冲突(write conflicts)问题并未完全解决。如果两个用户在不同区域同时修改同一库存,基于 increment 的操作可以正确处理,但如果是覆盖性更新,就需要引入版本号或 CRDTs 等更复杂的机制来合并冲突。

其次,运维成本不容忽视。监控 Kafka 跨区域复制的延迟、确保每个区域的投影消费者都能跟上事件流,以及管理 Istio 的复杂网络策略,都需要专门的 SRE 团队和强大的可观测性平台。

未来的一个优化路径是探索使用 GCP Pub/Sub 替代自建 Kafka。Pub/Sub 原生支持全球主题和订阅,可以简化跨区域复制的管理。但这需要仔细评估其与 Kafka 在顺序保证、消息保留策略和生态系统工具链方面的差异,以及潜在的成本变化。另一个方向是深化服务网格的应用,利用其流量镜像功能进行更安全的版本发布,或者实现更精细的、基于业务指标的熔断和限流策略。


  目录