构建基于Raft、eBPF与SwiftUI的分布式混沌工程注入与观测平台


要为复杂的微服务系统构建一个真正有用的混沌工程平台,核心挑战在于控制平面的健壮性与数据平面的无侵入性。一个在执行故障注入时自身崩溃的平台是毫无价值的。同样,一个因为注入工具导致目标服务性能显著下降的平台,其观测结果也失去了意义。我们需要一个既能保证自身高可用,又能以极低开销执行精确故障注入与实时状态观测的系统。

传统的方案,例如使用一个中心化的数据库(如 PostgreSQL)来存储实验状态,并通过部署在目标节点上的 Agent(利用 iptablestc 等工具)来执行注入,存在明显的架构缺陷。中心化数据库是单点故障,而 iptables 这类工具在规则复杂时会引入不可忽视的性能开销,且其观测能力有限。

为了解决这个问题,我们决定设计一个全新的架构。这个架构的核心决策是:

  1. 控制平面: 使用 Raft 协议构建一个高可用的分布式键值存储,专门用于管理混沌实验的生命周期状态。这彻底消除了单点故障。
  2. 数据平面: 在目标节点上利用 eBPF 技术实现故障注入与性能指标采集。eBPF 运行在内核态,开销极低,且能提供无与伦比的底层可见性,非常适合这种场景。
  3. 操作界面: 为站点可靠性工程师(SRE)提供一个原生的 macOS 客户端,使用 SwiftUI 构建,通过 gRPC Streaming 实时展示实验影响,提供最佳的交互体验。

这个组合看起来非主流,但它精确地解决了我们面临的核心问题:可靠性、性能和实时反馈。

架构决策:Raft 共识与 eBPF 执行的权衡

在进入实现细节之前,必须阐明为什么放弃更简单的方案。

方案 A:中心化元数据存储 + Agent

  • 架构: 一个 RESTful API 服务,后端连接单个关系型数据库。节点上运行一个 Agent,定期轮询 API 获取任务(例如,“对服务 A 注入 100ms 延迟”),然后调用 tc 命令执行。
  • 优势: 实现简单,技术栈常见,开发速度快。
  • 劣势:
    • 可靠性差: API 服务或数据库宕机,整个混沌工程平台瘫痪。正在进行的实验会成为僵尸进程,无法管理。
    • 性能侵入: tciptables 在处理高并发流量时,其内核路径不如 eBPF 高效。复杂的规则链本身就是性能瓶颈。
    • 观测能力弱: Agent 只能获取到用户态的宏观指标,无法精确度量内核网络栈中数据包级别的延迟、丢弃等情况。
    • 实时性不足: 依赖轮询,状态同步和指标上报延迟高。

方案 B:Raft 共识控制平面 + eBPF 数据平面

  • 架构: 3 个或 5 个节点的 Raft 集群构成控制平面,对外暴露 gRPC 接口。实验状态(如 Experiment{ID, Target, Type, Params, Status})作为状态机日志被复制到所有节点。Agent 加载 eBPF 程序,通过 gRPC 从控制平面接收指令,并将参数写入 eBPF Maps。eBPF 程序直接在内核网络路径上执行故障注入,并通过 perf buffer 将观测数据实时回传。

  • 优势:

    • 高可用: 只要集群中多数节点存活,控制平面就可用。不会因为单机故障丢失实验状态或中断服务。
    • 极致性能: eBPF 程序在内核中即时编译并执行,几乎没有上下文切换开销。故障注入和数据采集的性能损耗极低。
    • 精确观测: eBPF 可以挂载到内核的任意函数,例如 tcp_sendmsg, ip_rcv 等,从而实现纳秒级的延迟测量和精确的丢包统计。
    • 实时交互: gRPC Streaming 使得从 Agent 到控制平面,再到 SwiftUI 客户端的数据流是实时的、推送式的。
  • 劣势:

    • 实现复杂: 需要对 Raft 协议、eBPF 编程以及 gRPC 都有深入的理解。
    • 环境依赖: eBPF 依赖较新的 Linux 内核版本(通常是 4.9+)。

最终决策:对于一个旨在提升核心业务稳定性的平台级工具,可靠性和性能是不可妥协的。我们选择方案 B,尽管它带来了更高的实现复杂度,但其架构优势是长期的。

核心实现概览

整个系统的交互流程如下图所示:

sequenceDiagram
    participant SwiftUI_Client as SwiftUI Client (macOS)
    participant Control_Plane as Raft Control Plane (gRPC)
    participant Agent as Node Agent (eBPF Loader)
    participant Kernel as Linux Kernel

    SwiftUI_Client->>+Control_Plane: gRPC: StartExperiment(target, fault_type, params)
    Control_Plane->>Control_Plane: Propose Raft Log (Create Experiment)
    Control_Plane->>Control_Plane: Commit & Apply to FSM
    Control_Plane-->>-SwiftUI_Client: gRPC: Ack(ExperimentID)

    Note over Control_Plane, Agent: Agent connects and streams updates

    Control_Plane->>+Agent: gRPC Stream: NewInstruction(Experiment)
    Agent->>Kernel: Load eBPF program
    Agent->>Kernel: Update eBPF Map with fault params
    Agent->>Kernel: Attach eBPF to network hook (e.g., TC)
    Agent-->>-Control_Plane: gRPC Stream: Ack(InstructionReceived)

    Note over Agent, Kernel: Traffic flows, eBPF injects fault

    Kernel-->>Agent: Perf Buffer: Real-time metrics (latency, drops)
    Agent->>+Control_Plane: gRPC Stream: PushMetrics(ExperimentID, metrics)
    
    Note over SwiftUI_Client, Control_Plane: Client subscribes to experiment results

    SwiftUI_Client->>+Control_Plane: gRPC Stream: SubscribeToMetrics(ExperimentID)
    Control_Plane-->>-SwiftUI_Client: gRPC Stream: PushMetrics(metrics)

1. Raft 控制平面 (Go)

我们使用 HashiCorp 的 raft 库来构建控制平面。关键是实现 raft.FSM 接口,它定义了 Raft 日志如何改变我们的应用状态。

状态机存储的是当前所有混沌实验的详细信息。

// fsm.go - Raft State Machine Implementation

package main

import (
	"encoding/json"
	"fmt"
	"io"
	"sync"

	"github.com/hashicorp/raft"
)

// Experiment represents a single chaos experiment's state.
type Experiment struct {
	ID         string                 `json:"id"`
	Target     string                 `json:"target"`     // e.g., "service:checkout,version:v1"
	FaultType  string                 `json:"fault_type"` // e.g., "NETWORK_LATENCY"
	Parameters map[string]interface{} `json:"parameters"` // e.g., {"delay_ms": 100, "jitter_ms": 20}
	Status     string                 `json:"status"`     // PENDING, RUNNING, COMPLETED, FAILED
}

// Command represents a command to be applied to the FSM.
type Command struct {
	Op    string     `json:"op"` // "CREATE", "STOP"
	Exp *Experiment `json:"exp"`
}

// fsm is our state machine. It stores all active experiments.
type fsm struct {
	mu   sync.RWMutex
	// In a real project, this should be a more efficient data structure.
	experiments map[string]*Experiment 
}

func newFSM() *fsm {
	return &fsm{
		experiments: make(map[string]*Experiment),
	}
}

// Apply applies a Raft log entry to the FSM. This is the core of the state machine.
// It must be deterministic.
func (f *fsm) Apply(log *raft.Log) interface{} {
	f.mu.Lock()
	defer f.mu.Unlock()

	var cmd Command
	if err := json.Unmarshal(log.Data, &cmd); err != nil {
		// This is a critical error, in a production system this should
		// probably panic to signal a corrupted log entry.
		fmt.Printf("failed to unmarshal command: %s\n", err)
		return err
	}

	switch cmd.Op {
	case "CREATE":
		if _, exists := f.experiments[cmd.Exp.ID]; exists {
			fmt.Printf("experiment with ID %s already exists\n", cmd.Exp.ID)
			return nil // Idempotentcy
		}
		cmd.Exp.Status = "PENDING"
		f.experiments[cmd.Exp.ID] = cmd.Exp
		fmt.Printf("applied CREATE for experiment: %s\n", cmd.Exp.ID)
	case "STOP":
		if exp, exists := f.experiments[cmd.Exp.ID]; exists {
			exp.Status = "COMPLETED"
			fmt.Printf("applied STOP for experiment: %s\n", cmd.Exp.ID)
		}
	default:
		return fmt.Errorf("unrecognized command op: %s", cmd.Op)
	}

	return nil
}

// Snapshot is used to support log compaction. It returns an FSMState
// which can be used to restore the FSM.
func (f *fsm) Snapshot() (raft.FSMSnapshot, error) {
	f.mu.RLock()
	defer f.mu.RUnlock()

	// In a real system, you'd want a more robust serialization format.
	o := make(map[string]*Experiment, len(f.experiments))
	for k, v := range f.experiments {
		o[k] = v
	}
	return &fsmSnapshot{store: o}, nil
}

// Restore is used to restore an FSM from a snapshot. It is called when
// a node is catching up with the leader.
func (f *fsm) Restore(rc io.ReadCloser) error {
	f.mu.Lock()
	defer f.mu.Unlock()

	var store map[string]*Experiment
	if err := json.NewDecoder(rc).Decode(&store); err != nil {
		return err
	}

	f.experiments = store
	return nil
}

// fsmSnapshot handles the serialization of the FSM state.
type fsmSnapshot struct {
	store map[string]*Experiment
}

func (s *fsmSnapshot) Persist(sink raft.SnapshotSink) error {
	err := func() error {
		b, err := json.Marshal(s.store)
		if err != nil {
			return err
		}
		if _, err := sink.Write(b); err != nil {
			return err
		}
		return sink.Close()
	}()

	if err != nil {
		sink.Cancel()
	}
	return err
}

func (s *fsmSnapshot) Release() {}

这个 FSM 实现是确定性的,并且支持快照,这是生产级 Raft 应用的必备条件。通过 gRPC 接口接收到 StartExperiment 请求后,服务节点(如果是 Leader)会将一个 CREATE 操作的 Command 提交给 Raft Apply()。一旦日志被多数节点提交,Apply 方法就会在所有节点上执行,从而保证了实验状态的一致性。

2. eBPF 故障注入器 (C + Go-libbpf)

eBPF 部分分为两块:内核态的 C 程序和用户态的 Go 加载/控制程序。

内核态 C 程序 (injector.c)

这个程序使用 TC (Traffic Control) 的 cls_act hook 点,它能让我们在网络设备的数据包入口(ingress)和出口(egress)处执行代码。

// injector.c - A simple eBPF program for network latency injection.
#include <linux/bpf.h>
#include <linux/pkt_cls.h>
#include <bpf/bpf_helpers.h>
#include <bpf/bpf_endian.h>

// Configuration for fault injection, controlled from userspace.
struct {
    __uint(type, BPF_MAP_TYPE_ARRAY);
    __uint(max_entries, 1);
    __type(key, __u32);
    __type(value, __u64); // latency in nanoseconds
} config_map SEC(".maps");

// Helper to add delay. In a real scenario, this is tricky.
// bpf_ktime_get_ns() gives current time. A simple busy loop is not a good
// idea as it hogs CPU. A better approach might involve timers or other
// advanced eBPF features, but for a PoC, we use a simple ktime check.
// WARNING: This is a simplified delay mechanism. Real-world implementations
// are more complex to avoid scheduler issues.
static __always_inline void add_delay(__u64 delay_ns) {
    if (delay_ns == 0) {
        return;
    }
    __u64 start = bpf_ktime_get_ns();
    __u64 deadline = start + delay_ns;
    while (bpf_ktime_get_ns() < deadline) {
        // This is a busy-wait loop. It's inefficient and should be used
        // with extreme caution. It's for demonstration only.
        // A production system might need kernel timers.
    }
}

SEC("tc_ingress")
int inject_latency(struct __sk_buff *skb) {
    __u32 key = 0;
    __u64 *delay_ns_ptr;

    delay_ns_ptr = bpf_map_lookup_elem(&config_map, &key);
    if (!delay_ns_ptr) {
        // No config found, pass through.
        return TC_ACT_OK;
    }

    __u64 delay_ns = *delay_ns_ptr;
    if (delay_ns > 0) {
        // A real system would also add logic to filter by IP, port, etc.
        // using additional maps populated by the userspace agent.
        add_delay(delay_ns);
    }
    
    return TC_ACT_OK;
}

char LICENSE[] SEC("license") = "GPL";

用户态 Go Agent (agent.go)

Go Agent 负责加载 eBPF 程序、将其挂载到指定的网络接口,并通过 bpf 系统调用更新 config_map 来动态调整延迟。

// agent/main.go - Userspace component to manage the eBPF program.
package main

import (
	"log"
	"time"
	"github.com/cilium/ebpf/link"
	"github.com/cilium/ebpf/rlimit"
	"net"
)

//go:generate go run github.com/cilium/ebpf/cmd/bpf2go bpf injector.c -- -I/usr/include/bpf

const (
	ifaceName = "eth0" // The network interface to attach to.
)

func main() {
	// It's recommended to remove memory lock limits for eBPF.
	if err := rlimit.RemoveMemlock(); err != nil {
		log.Fatalf("Failed to remove rlimit memlock: %v", err)
	}

	objs := bpfObjects{}
	if err := loadBpfObjects(&objs, nil); err != nil {
		log.Fatalf("loading eBPF objects: %v", err)
	}
	defer objs.Close()

	iface, err := net.InterfaceByName(ifaceName)
	if err != nil {
		log.Fatalf("getting interface %s: %s", ifaceName, err)
	}

	// Attach the TC program.
	l, err := link.AttachTCX(link.TCXOptions{
		Program:   objs.InjectLatency,
		Attach:    link.TCXIngress,
		Interface: iface.Index,
	})
	if err != nil {
		log.Fatalf("could not attach TC program: %s", err)
	}
	defer l.Close()

	log.Printf("eBPF program attached to %s. Press Ctrl+C to exit.", ifaceName)

	// This is where the agent would connect to the Raft control plane via gRPC.
	// For this example, we'll just simulate receiving an instruction.
	// updateFaultParameters(objs.ConfigMap, 100 * time.Millisecond)
    // Here we'd listen on a gRPC stream from the control plane
    // for new instructions.
    
    // Example: inject 100ms latency for 30 seconds.
    log.Println("Injecting 100ms latency...")
    updateFaultParameters(objs.bpfMaps.ConfigMap, 100 * time.Millisecond)

    time.Sleep(30 * time.Second)

    log.Println("Stopping injection.")
    updateFaultParameters(objs.bpfMaps.ConfigMap, 0)
}

// updateFaultParameters updates the eBPF map with the new latency value.
func updateFaultParameters(configMap *ebpf.Map, latency time.Duration) {
	key := uint32(0)
	value := uint64(latency.Nanoseconds())
	
	if err := configMap.Update(&key, &value, ebpf.UpdateAny); err != nil {
		log.Printf("Error updating config map: %v", err)
	}
}

一个常见的错误是在 eBPF 程序中使用不被验证器允许的函数,或者创建过于复杂的循环。这里的 add_delay 是一个简化的例子,生产环境需要更复杂的机制来避免占用过多 CPU 时间。

3. SwiftUI 客户端与 gRPC-Swift

最后是 SwiftUI 客户端。它作为 SRE 的操作台,需要清晰地展示实验状态并实时反馈指标。

Protobuf 定义 (chaos.proto)

syntax = "proto3";

package chaos;

option go_package = "chaos/api";

service ChaosController {
    rpc StartExperiment(StartExperimentRequest) returns (StartExperimentResponse);
    rpc StopExperiment(StopExperimentRequest) returns (StopExperimentResponse);
    rpc SubscribeToMetrics(SubscribeRequest) returns (stream MetricUpdate);
}

message StartExperimentRequest {
    string target = 1;
    string fault_type = 2;
    // Using a generic map for parameters
    map<string, string> parameters = 3;
}

message StartExperimentResponse {
    string experiment_id = 1;
}

// ... other request/response messages ...

message SubscribeRequest {
    string experiment_id = 1;
}

message MetricUpdate {
    string experiment_id = 1;
    int64 timestamp_ns = 2;
    // Example metrics from eBPF
    double p99_latency_ms = 3;
    int64 packet_drops = 4;
}

SwiftUI 视图模型 (ExperimentViewModel.swift)

这个 ViewModel 封装了所有 gRPC 通信逻辑,并使用 Combine 框架将数据发布给 SwiftUI 视图。

// ExperimentViewModel.swift - Connects SwiftUI views to the gRPC backend.
import Foundation
import Combine
import GRPC
import NIO

@MainActor
class ExperimentViewModel: ObservableObject {
    @Published var experimentID: String = ""
    @Published var p99Latency: Double = 0.0
    @Published var packetDrops: Int64 = 0
    @Published var isSubscribed: Bool = false
    
    private var client: Chaos_ChaosControllerClient
    private var metricsSubscription: BidirectionalStreamingCall<Chaos_SubscribeRequest, Chaos_MetricUpdate>?

    init() {
        // Setup gRPC connection. In a real app, this would be more robust.
        let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
        let channel = ClientConnection.insecure(group: group)
            .connect(host: "127.0.0.1", port: 50051) // Your control plane address
        self.client = Chaos_ChaosControllerClient(channel: channel)
    }
    
    // Function to start a new experiment
    func startLatencyExperiment(target: String, latencyMs: Int) {
        var request = Chaos_StartExperimentRequest()
        request.target = target
        request.faultType = "NETWORK_LATENCY"
        request.parameters = ["delay_ms": String(latencyMs)]
        
        let call = client.startExperiment(request)
        
        call.response.whenSuccess { [weak self] response in
            DispatchQueue.main.async {
                self?.experimentID = response.experimentID
                self?.subscribeToMetrics()
            }
        }
        call.response.whenFailure { error in
            print("Error starting experiment: \(error)")
        }
    }
    
    // Subscribe to real-time metrics for the current experiment
    private func subscribeToMetrics() {
        guard !experimentID.isEmpty else { return }
        
        isSubscribed = true
        
        let call = client.subscribeToMetrics { [weak self] update in
            // This closure is called for each message received from the server.
            DispatchQueue.main.async {
                self?.p99Latency = update.p99LatencyMs
                self?.packetDrops = update.packetDrops
            }
        }
        
        // Send the initial subscription request
        var request = Chaos_SubscribeRequest()
        request.experimentID = self.experimentID
        _ = call.sendMessage(request)
    }
}

// A simple SwiftUI view to display the data.
struct ExperimentView: View {
    @StateObject private var viewModel = ExperimentViewModel()
    
    var body: some View {
        VStack(alignment: .leading, spacing: 20) {
            Text("Chaos Engineering Control")
                .font(.largeTitle)
            
            Button("Start 100ms Latency Test") {
                viewModel.startLatencyExperiment(target: "service:payment", latencyMs: 100)
            }
            .disabled(viewModel.isSubscribed)
            
            if !viewModel.experimentID.isEmpty {
                Text("Experiment ID: \(viewModel.experimentID)")
                
                VStack {
                    Text("P99 Latency: \(String(format: "%.2f", viewModel.p99Latency)) ms")
                    Text("Packet Drops: \(viewModel.packetDrops)")
                }
                .padding()
                .border(Color.gray, width: 1)
            }
        }
        .padding()
    }
}

这段 Swift 代码展示了如何调用 Unary RPC (startExperiment) 以及如何处理 Server Streaming RPC (subscribeToMetrics) 来接收实时更新。@MainActorDispatchQueue.main.async 确保了所有 UI 更新都在主线程上进行,这是 SwiftUI 的基本要求。

架构的局限性与未来迭代

这个架构虽然解决了核心的可靠性和性能问题,但并非完美。

首先,eBPF 的故障注入实现目前非常初级。它仅支持全局网络延迟,一个生产级的系统需要能够基于 cgroup、IP 地址、端口甚至 L7 协议内容进行精细化过滤,这需要更复杂的 eBPF 程序和 Maps 设计。此外,除了网络故障,还应支持 CPU、内存、磁盘 I/O 等类型的故障注入。

其次,Raft 集群的管理本身就是一个挑战。虽然它提供了高可用性,但节点的动态成员变更、快照的性能优化、以及跨数据中心部署时的网络延迟对 Raft 性能的影响都需要仔细考量和压测。

最后,当前的 gRPC 通信是明文的。在生产环境中,必须启用 mTLS 对控制平面和 Agent 之间的所有通信进行加密和认证,以防止未经授权的故障注入。同时,需要一套完善的权限系统(ACLs),确保只有授权的 SRE 才能对特定服务发起混沌实验。


  目录