基于gRPC拦截器为LlamaIndex构建具备动态防火墙能力的Feature Store访问层


定义问题:隔离与共享的冲突

在一个支撑多个AI应用的平台中,一个中心化的Feature Store是数据复用的基石。然而,当这些应用(例如,基于LlamaIndex构建的多个独立的RAG系统)分属不同业务线或租户时,一个严峻的架构挑战浮出水面:如何在保证数据查询高性能的同时,实现严格、动态且细粒度的访问隔离?

具体来说,假设我们有一个Feature Store服务,它存储了来自不同业务的特征数据。同时,有多个LlamaIndex应用实例,每个实例服务于一个特定租户,它们需要通过gRPC从Feature Store中拉取特征向量和元数据,用于构建索引或在RAG流程中进行实时增强。

需求如下:

  1. 强隔离性:租户A的LlamaIndex应用绝对不能访问到租户B的特征数据。
  2. 细粒度控制:不仅要隔离租户,甚至要能控制到某个应用只能访问特定的特征集(Feature Set)。
  3. 动态配置:权限策略必须能够动态更新,无需重启服务。
  4. 高性能:访问控制机制本身不能成为性能瓶颈,因为Feature Store的查询延迟至关重要。
  5. 关注点分离:安全逻辑不应侵入Feature Store的核心业务逻辑。

方案A:传统网络防火墙与服务副本

一个直接的思路是物理或网络层面的隔离。我们可以为每个租户部署一套独立的Feature Store服务实例,并利用VPC、子网或Kubernetes Namespace等机制进行网络隔离。访问控制则通过配置网络安全组(Security Groups)或传统防火墙规则实现,只允许特定来源的IP访问对应的服务实例。

优势分析:

  • 隔离性极强:网络层面的隔离是最彻底的,几乎没有跨租户数据泄露的风险。
  • 技术成熟:这是久经考验的传统安全模型,运维人员对此非常熟悉。

劣势分析:

  • 资源浪费与成本高昂:为每个租户部署完整副本,导致巨大的计算和存储资源冗余。在租户数量庞大时,这种模式是不可持续的。
  • 管理复杂性:维护成百上千套独立部署的服务和对应的防火墙规则,是一场运维噩梦。
  • 缺乏应用层感知:传统防火墙工作在L3/L4层,它只认IP和端口。它无法理解“租户A的应用llama-agent-search有权访问特征集product_embeddings_v2”这样的应用层语义。所有细粒度的控制要么无法实现,要么需要为每个细微的权限组合创建新的服务副本和网络规则,这显然不现实。
  • 动态性差:更新防火墙规则通常需要一定的流程和时间,无法满足实时动态调整权限的需求。

在真实项目中,这种方案仅适用于少数、大型且预算充足的租户。对于一个需要支持大量动态租户的平台来说,其成本和复杂性使其不具备可行性。

方案B:业务逻辑内置权限校验

另一个方案是在Feature Store服务的业务逻辑代码中直接实现权限校验。每个gRPC方法的实现开始时,都会有一段代码用于解析请求中的身份信息(如Token),然后查询权限数据库,判断当前调用者是否有权执行该操作。

优势分析:

  • 实现细粒度控制:由于校验逻辑在应用层,它可以访问所有请求上下文,能够轻松实现“用户X是否有权访问资源Y”的判断。
  • 无需额外基础设施:所有逻辑都在应用代码中,不依赖复杂的网络配置。

劣势分析:

  • 代码耦合与职责不清:安全校验逻辑与核心业务逻辑紧密耦合。每个gRPC方法都需要重复编写类似的校验代码,违反了单一职责原则。这使得代码难以阅读和维护。
  • 可维护性差:当权限模型变更时,可能需要修改大量的业务代码。审计权限相关的逻辑也变得非常困难,因为它们散落在代码库的各个角落。
  • 容易出错:开发人员在实现新功能时,很容易忘记添加或正确实现权限校验,从而引入安全漏洞。

这种方案将安全这个横切关注点(Cross-cutting Concern)错误地与业务逻辑混合在一起,导致系统长期维护成本急剧上升。

最终选择:基于gRPC拦截器的可编程防火墙

我们选择第三条路:利用gRPC的拦截器(Interceptor)机制,在请求到达业务逻辑之前,构建一个独立的、可编程的应用层防火墙。这个“防火墙”是一个集中的策略执行点(Policy Enforcement Point),它负责解析身份、评估策略,并决定是放行请求还是直接拒绝。

sequenceDiagram
    participant LlamaIndexClient as LlamaIndex Agent (Client)
    participant Interceptor as gRPC Firewall Interceptor
    participant FeatureStoreSvc as Feature Store Service Logic

    LlamaIndexClient->>+Interceptor: GetFeatures(request, metadata[token: tenant_a_token])
    Note right of Interceptor: 1. 拦截请求
    Interceptor->>Interceptor: 2. 从Metadata中提取Token
    Interceptor->>Interceptor: 3. 验证Token并解析出身份 (tenant_id: "tenant-a", agent_id: "search-agent")
    Interceptor->>Interceptor: 4. 从请求体中解析资源 (feature_set_id: "fs-prod-vec")
    Interceptor->>Interceptor: 5. 加载并查询访问控制策略 (ACL)
    Note right of Interceptor: Check: Tenant "tenant-a" can access "fs-prod-vec"? -> Yes
    Interceptor->>+FeatureStoreSvc: GetFeatures(request)
    FeatureStoreSvc-->>-Interceptor: features_response
    Interceptor-->>-LlamaIndexClient: features_response

    LlamaIndexClient->>+Interceptor: GetFeatures(request, metadata[token: tenant_b_token])
    Note right of Interceptor: 1. 拦截请求 (租户B的请求)
    Interceptor->>Interceptor: 2. 提取Token, 解析身份 (tenant_id: "tenant-b")
    Interceptor->>Interceptor: 3. 从请求体中解析资源 (feature_set_id: "fs-prod-vec")
    Interceptor->>Interceptor: 4. 查询访问控制策略 (ACL)
    Note right of Interceptor: Check: Tenant "tenant-b" can access "fs-prod-vec"? -> No
    Interceptor-->>-LlamaIndexClient: Error: Permission Denied

选择理由:

  1. 关注点分离:安全逻辑从业务代码中完全剥离。Feature Store的开发者可以专注于实现核心的数据存取功能,而无需关心复杂的认证授权。
  2. 集中式控制:所有请求的访问控制都在拦截器中统一处理。这使得审计、更新和维护安全策略变得极为简单和安全。
  3. 高性能:gRPC拦截器是框架原生支持的,位于请求处理路径的开端,开销极小。策略本身可以加载到内存中,实现毫秒甚至微秒级的快速决策。
  4. 完全可编程与动态:访问控制策略可以从配置文件、数据库或配置中心(如etcd)动态加载,实现权限的实时变更而无需重启服务。
  5. 透明性:对于业务逻辑代码而言,拦截器的存在是透明的。对于客户端而言,它只需要在请求的元数据中附带身份凭证即可。

这种架构将防火墙的概念从网络层提升到了应用层,创建了一个能够理解gRPC方法、请求参数和租户身份的智能屏障。

核心实现概览

我们将使用Go语言来实现这个gRPC服务。

1. 定义 protobuf 接口

首先,我们需要一个清晰的 proto 定义,它描述了我们的Feature Store服务。关键在于,请求消息体需要包含明确的资源标识符。

// feature_store.proto
syntax = "proto3";

package featurestore;

option go_package = ".;featurestore";

// Feature Store Service definition
service FeatureStore {
  // Get a set of features
  rpc GetFeatures(GetFeaturesRequest) returns (GetFeaturesResponse);
}

message GetFeaturesRequest {
  // The unique identifier for the feature set being requested.
  // This is the resource our firewall will check against.
  string feature_set_id = 1;

  // A list of entity IDs to retrieve features for.
  repeated string entity_ids = 2;
}

message Feature {
  string entity_id = 1;
  map<string, double> values = 2;
}

message GetFeaturesResponse {
  repeated Feature features = 1;
}

2. 实现可动态加载的访问控制策略

我们的“防火墙”规则需要一个数据结构来承载。在真实项目中,这可能存储在数据库或配置中心。这里我们用一个JSON文件来模拟,并实现一个可以热加载的策略管理器。

config/acl.json:

{
  "policies": {
    "tenant-a": {
      "allowed_feature_sets": ["fs-prod-vec", "fs-user-profile"]
    },
    "tenant-b": {
      "allowed_feature_sets": ["fs-user-profile"]
    },
    "internal-admin": {
      "allowed_feature_sets": ["*"]
    }
  }
}

acl_provider.go:

package main

import (
	"encoding/json"
	"fmt"
	"io/ioutil"
	"sync"

	"go.uber.org/zap"
)

// Policy represents the access rights for a single tenant.
type Policy struct {
	AllowedFeatureSets []string `json:"allowed_feature_sets"`
}

// ACLProvider manages and provides access to the access control list.
// It's designed to be thread-safe and support hot-reloading.
type ACLProvider struct {
	logger   *zap.Logger
	filePath string
	mu       sync.RWMutex
	policies map[string]Policy
}

// NewACLProvider creates a new ACL provider and loads the initial policies.
func NewACLProvider(logger *zap.Logger, path string) (*ACLProvider, error) {
	p := &ACLProvider{
		logger:   logger,
		filePath: path,
		policies: make(map[string]Policy),
	}
	if err := p.loadPolicies(); err != nil {
		return nil, fmt.Errorf("initial policy load failed: %w", err)
	}
	return p, nil
}

// loadPolicies reads the policy file and updates the in-memory map.
func (p *ACLProvider) loadPolicies() error {
	p.mu.Lock()
	defer p.mu.Unlock()

	data, err := ioutil.ReadFile(p.filePath)
	if err != nil {
		return fmt.Errorf("failed to read policy file %s: %w", p.filePath, err)
	}

	var config struct {
		Policies map[string]Policy `json:"policies"`
	}
	if err := json.Unmarshal(data, &config); err != nil {
		return fmt.Errorf("failed to unmarshal policy JSON: %w", err)
	}

	p.policies = config.Policies
	p.logger.Info("Successfully loaded ACL policies", zap.Int("policy_count", len(p.policies)))
	return nil
}

// IsAllowed checks if a given tenant is allowed to access a specific feature set.
func (p *ACLProvider) IsAllowed(tenantID, featureSetID string) bool {
	p.mu.RLock()
	defer p.mu.RUnlock()

	policy, ok := p.policies[tenantID]
	if !ok {
		// Default deny if tenant has no policy
		return false
	}

	for _, allowedSet := range policy.AllowedFeatureSets {
		if allowedSet == "*" || allowedSet == featureSetID {
			return true
		}
	}

	return false
}

// In a real application, you would add a file watcher to call loadPolicies on change
// for hot-reloading.

3. 核心:防火墙拦截器的实现

这是整个架构的核心。拦截器是一个函数,它接收上下文、请求和下一个处理程序的引用。

firewall_interceptor.go:

package main

import (
	"context"
	"fmt"
	"strings"

	"go.uber.org/zap"
	"google.golang.org/grpc"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/metadata"
	"google.golang.org/grpc/status"
)

// resourceExtractor defines an interface to extract the resource ID from a request.
// This makes the interceptor more generic and reusable for different RPCs.
type resourceExtractor func(req interface{}) (string, error)

// AuthInfo holds the authenticated identity of the caller.
type AuthInfo struct {
	TenantID string
}

// FirewallInterceptor provides a gRPC unary server interceptor for authorization.
type FirewallInterceptor struct {
	logger *zap.Logger
	acl    *ACLProvider
}

// NewFirewallInterceptor creates a new firewall interceptor.
func NewFirewallInterceptor(logger *zap.Logger, acl *ACLProvider) *FirewallInterceptor {
	return &FirewallInterceptor{
		logger: logger,
		acl:    acl,
	}
}

// Authorize is the unary interceptor function.
func (i *FirewallInterceptor) Authorize(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
	// 1. Authenticate the caller by extracting identity from metadata.
	authInfo, err := i.authenticate(ctx)
	if err != nil {
		i.logger.Warn("Authentication failed", zap.Error(err))
		return nil, status.Error(codes.Unauthenticated, "authentication failed")
	}

	// 2. Extract the resource being accessed from the request payload.
	// This part needs to be aware of the request message structure.
	var resourceID string
	if getFeaturesReq, ok := req.(interface{ GetFeatureSetId() string }); ok {
		resourceID = getFeaturesReq.GetFeatureSetId()
	} else {
		// This should not happen if the interceptor is applied correctly.
		i.logger.Error("Interceptor applied to an unsupported request type", zap.String("method", info.FullMethod))
		return nil, status.Error(codes.Internal, "interceptor configuration error")
	}

	if resourceID == "" {
		return nil, status.Error(codes.InvalidArgument, "feature_set_id is required")
	}

	// 3. Check against the ACL provider (the core firewall logic).
	if !i.acl.IsAllowed(authInfo.TenantID, resourceID) {
		i.logger.Warn("Authorization denied",
			zap.String("tenant_id", authInfo.TenantID),
			zap.String("resource_id", resourceID),
			zap.String("method", info.FullMethod),
		)
		return nil, status.Errorf(codes.PermissionDenied, "tenant '%s' is not allowed to access resource '%s'", authInfo.TenantID, resourceID)
	}

	i.logger.Debug("Authorization successful",
		zap.String("tenant_id", authInfo.TenantID),
		zap.String("resource_id", resourceID),
	)

	// 4. If allowed, proceed to the actual gRPC handler.
	return handler(ctx, req)
}

// authenticate extracts and validates the token from metadata.
// In a real system, this would involve JWT validation or a call to an identity service.
func (i *FirewallInterceptor) authenticate(ctx context.Context) (*AuthInfo, error) {
	md, ok := metadata.FromIncomingContext(ctx)
	if !ok {
		return nil, fmt.Errorf("missing metadata from context")
	}

	authHeaders := md.Get("authorization")
	if len(authHeaders) == 0 {
		return nil, fmt.Errorf("missing authorization header")
	}

	// For simplicity, we use a "Bearer tenant-id" scheme.
	// In production, use JWTs.
	token := authHeaders[0]
	parts := strings.Split(token, " ")
	if len(parts) != 2 || !strings.EqualFold(parts[0], "Bearer") {
		return nil, fmt.Errorf("invalid authorization header format")
	}

	tenantID := parts[1]
	if tenantID == "" {
		return nil, fmt.Errorf("tenant ID in token is empty")
	}

	return &AuthInfo{TenantID: tenantID}, nil
}

4. 组装 gRPC 服务器

现在,我们将拦截器、ACL提供者和我们的业务服务逻辑(一个简单的桩实现)组装起来。

server.go:

package main

import (
	"context"
	"fmt"
	"log"
	"net"

	pb "your_project/featurestore" // Import generated protobuf package

	"go.uber.org/zap"
	"google.golang.org/grpc"
)

// featureStoreServer is a mock implementation of our service.
type featureStoreServer struct {
	pb.UnimplementedFeatureStoreServer
	logger *zap.Logger
}

func (s *featureStoreServer) GetFeatures(ctx context.Context, req *pb.GetFeaturesRequest) (*pb.GetFeaturesResponse, error) {
	s.logger.Info("Executing business logic for GetFeatures",
		zap.String("feature_set_id", req.FeatureSetId),
		zap.Strings("entity_ids", req.EntityIds),
	)
	// In a real implementation, this would query a database like Redis or Cassandra.
	// We return dummy data here.
	return &pb.GetFeaturesResponse{
		Features: []*pb.Feature{
			{EntityId: req.EntityIds[0], Values: map[string]double{"embedding_0": 0.1, "embedding_1": 0.9}},
		},
	}, nil
}

func main() {
	logger, _ := zap.NewProduction()
	defer logger.Sync()

	// 1. Initialize ACL provider.
	aclProvider, err := NewACLProvider(logger, "config/acl.json")
	if err != nil {
		logger.Fatal("Failed to create ACL provider", zap.Error(err))
	}

	// 2. Initialize the firewall interceptor.
	firewall := NewFirewallInterceptor(logger, aclProvider)

	// 3. Create a gRPC server with our unary interceptor.
	server := grpc.NewServer(
		grpc.UnaryInterceptor(firewall.Authorize),
	)

	// 4. Register our service implementation.
	pb.RegisterFeatureStoreServer(server, &featureStoreServer{logger: logger})

	listener, err := net.Listen("tcp", ":50051")
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}

	logger.Info("gRPC server with firewall interceptor listening on :50051")
	if err := server.Serve(listener); err != nil {
		log.Fatalf("failed to serve: %v", err)
	}
}

5. LlamaIndex 客户端集成思路

在客户端,例如一个使用LlamaIndex的Go应用,调用此服务时需要注入认证元数据。

package main

import (
	"context"
	"log"
	"time"

	pb "your_project/featurestore"

	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
	"google.golang.org/grpc/metadata"
)

// perRPCCredentials implements credentials.PerRPCCredentials for attaching auth tokens.
type perRPCCredentials struct {
	token string
}

func (c *perRPCCredentials) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
	return map[string]string{
		"authorization": "Bearer " + c.token,
	}, nil
}

func (c *perRPCCredentials) RequireTransportSecurity() bool {
	// In production, this should be true, and you should use TLS.
	return false
}

func main() {
	// --- Client for Tenant A (should succeed) ---
	connA, err := grpc.Dial("localhost:50051", grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer connA.Close()
	clientA := pb.NewFeatureStoreClient(connA)

	// Context with authentication for Tenant A
	ctxA := metadata.AppendToOutgoingContext(context.Background(), "authorization", "Bearer tenant-a")
	
	// Hypothetical LlamaIndex logic that needs features
	log.Println("LlamaIndex Agent for Tenant A fetching features for 'fs-prod-vec'...")
	respA, errA := clientA.GetFeatures(ctxA, &pb.GetFeaturesRequest{FeatureSetId: "fs-prod-vec", EntityIds: []string{"product-123"}})
	if errA != nil {
		log.Printf("Tenant A failed: %v", errA)
	} else {
		log.Printf("Tenant A succeeded: Received %d features", len(respA.Features))
	}

	// --- Client for Tenant B (should be denied) ---
	connB, err := grpc.Dial("localhost:50051", grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer connB.Close()
	clientB := pb.NewFeatureStoreClient(connB)
	
	ctxB := metadata.AppendToOutgoingContext(context.Background(), "authorization", "Bearer tenant-b")
	log.Println("LlamaIndex Agent for Tenant B trying to fetch features for 'fs-prod-vec'...")
	_, errB := clientB.GetFeatures(ctxB, &pb.GetFeaturesRequest{FeatureSetId: "fs-prod-vec", EntityIds: []string{"product-123"}})
	if errB != nil {
		log.Printf("Tenant B correctly blocked: %v", errB) // Expects "PermissionDenied"
	} else {
		log.Printf("Tenant B INCORRECTLY succeeded!")
	}
}

架构的扩展性与局限性

此架构模式具有良好的扩展性。防火墙拦截器可以被链式调用,形成一个处理管道。我们可以轻松地加入其他拦截器来处理速率限制、请求日志、指标收集(metrics)和分布式追踪,而无需修改任何业务代码。策略引擎 ACLProvider 也可以替换为更复杂的系统,例如连接到 Open Policy Agent (OPA) 或其他外部授权服务,以实现更灵活的基于属性的访问控制(ABAC)。

然而,这个方案也存在局限性。它本质上是一个应用层的安全机制,并不能替代网络层的安全防护。它无法防御DDoS攻击或网络扫描。因此,它应被视为纵深防御策略中的一层,与网络防火墙、WAF等协同工作。此外,当拦截器中的逻辑变得异常复杂,或者需要进行大量外部IO调用(例如每次都去请求授权服务)时,其性能开销需要被仔细评估和监控,避免它成为系统瓶颈。最后,确保客户端正确、安全地管理和传递身份凭证,是整个安全链条中同样关键的一环。


  目录