基于TDD与OCI工件实现Flux CD驱动的HBase声明式状态管理


我们团队的 HBase 集群 schema 和 ACL 管理一度陷入混乱。变更流程依赖于工程师手动执行 hbase shell 命令,或是运行一次性的 Groovy 脚本。这种方式在开发环境尚可应付,但在生产环境,它成了事故的温床:权限配置错误导致数据泄露风险,不兼容的列族修改引发线上服务中断。每一次变更都如履薄冰,变更记录散落在 Confluence 页面和 Jira Ticket 中,无法形成统一、可审计的视图。我们迫切需要一套声明式的、版本化的、经过严格测试的自动化流程来管理 HBase 的状态。

最初的构想很简单:将 schema 定义(比如 create 'my_table', 'cf1' 这样的命令)存放在 Git 仓库中,然后通过 CI/CD pipeline 执行。但这很快就暴露了问题:这些命令是过程式的,不是声明式的;它们无法表达最终状态,也难以进行幂等性检查。更重要的是,如何在一个变更被应用到生产环境 之前,就确保它是安全、兼容的?这就是测试驱动开发(TDD)思想切入的契机。

我们将目标锁定在 GitOps 模式,并选择了 Flux CD 作为核心工具。但我们很快意识到,单纯用 Flux CD 同步 Git 仓库里的 YAML 文件,并不能直接作用于 HBase。HBase 的状态管理需要一个“翻译官”。同时,我们希望打包的不仅仅是 schema 定义,还包括关联的 ACL 策略、预聚合配置等,形成一个完整的、不可变的、版本化的“状态单元”。这让我们把目光投向了 OCI(Open Container Initiative)工件。OCI 不仅仅是用来存放容器镜像的,它可以作为任何云原生工件的通用打包格式。

最终的技术方案轮廓逐渐清晰:

  1. 声明式定义: 用 YAML 文件来声明式地定义 HBase 表的结构、列族属性和用户权限。
  2. TDD 验证: 开发一套测试框架。任何对声明式定义的变更,都必须先通过本地测试,验证其语法、逻辑以及对现有状态的兼容性。这是一个 CI 流程中的强制关卡。
  3. OCI 打包: 测试通过后,将这些 YAML 文件打包成一个带版本的 OCI 工件,推送到容器镜像仓库。这个工件代表了一个经过验证的、完整的 HBase 状态快照。
  4. Flux CD 应用: Flux CD 监控 OCI 仓库。当有新版本的工件发布时,它会自动拉取,并在 Kubernetes 集群中触发一个 Job。这个 Job 运行一个特制的 reconciler(协调器),负责解析 OCI 工件中的内容,并将其转化为实际的 HBase API 调用,最终使 HBase 集群的状态与声明保持一致。

整个流程可以用下面的 Mermaid 图来概括:

graph TD
    subgraph "工程师本地环境/CI"
        A[1. 修改 YAML 状态定义] --> B{2. 运行 TDD 验证套件};
        B -- 测试通过 --> C[3. 打包成 OCI 工件];
        C --> D[4. 推送 v1.1.0 到 OCI Registry];
        B -- 测试失败 --> E[修复定义或测试];
    end

    subgraph "Kubernetes 集群 (Flux CD)"
        F[5. Flux OCI Source 发现新版本] --> G[6. 触发 Kustomization];
        G --> H[7. 运行一个一次性的 K8s Job];
    end

    subgraph "HBase 集群"
        I[9. HBase Master/RegionServers]
    end

    H --> J[8. Job 内的 Reconciler 解析工件];
    J -- HBase API 调用 --> I;

    subgraph "OCI Registry"
        D -- Pushes --> K((Registry));
        F -- Polls --> K;
    end

第一步:定义声明式状态契约

我们需要一种机器可读的格式来描述 HBase 的状态。YAML 是个不错的选择。我们定义一个 HBaseTable 的 CRD-like 结构,虽然我们初期不打算实现一个完整的 Operator,但遵循这种结构有助于未来的扩展。

my-prod-table.yaml:

# my-prod-table.yaml
# 定义一个名为 'user_profiles' 的 HBase 表及其相关状态
apiVersion: hbase.corp.com/v1alpha1
kind: HBaseTable
metadata:
  name: user_profiles
  namespace: data-engineering
spec:
  # 表的列族定义
  columnFamilies:
    - name: info # 用户基本信息
      properties:
        VERSIONS: 1
        COMPRESSION: 'SNAPPY'
        TTL: '2592000' # 30 days
        BLOCKCACHE: 'true'
    - name: metrics # 用户行为指标
      properties:
        VERSIONS: 5
        COMPRESSION: 'SNAPPY'
        IN_MEMORY: 'false'
  
  # 表的访问控制列表 (ACLs)
  acls:
    # 'data-analytics' 组拥有读权限
    - principal: 'data-analytics'
      type: Group
      permissions: "R" # Read
    # 'user-service' 服务账号拥有读写权限
    - principal: 'user-service-account'
      type: User
      permissions: "RW" # Read, Write
    # 'admin-group' 拥有管理权限
    - principal: 'admin-group'
      type: Group
      permissions: "RWCA" # Read, Write, Create, Admin

这个 YAML 文件清晰地定义了 user_profiles 表的两个列族及其属性,以及三个不同角色的权限。这就是我们期望的“最终状态”。

第二步:为状态变更构建 TDD 防线

这是整个方案的核心。任何对上述 YAML 文件的修改,比如增加一个列族,或者修改一个权限,都必须通过测试。我们使用 Java 和 hbase-testing-util 来构建这个测试环境。

HBaseSchemaValidatorTest.java:

// 使用 JUnit 5 进行测试
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.junit.jupiter.api.*;

import java.io.IOException;

import static org.junit.jupiter.api.Assertions.*;

/**
 * 这是一个核心的验证器测试类。
 * 它模拟一个已存在的 HBase 表状态,然后尝试应用一个新的 YAML 定义,
 * 并检查变更是否兼容、合法。
 */
class HBaseSchemaValidatorTest {

    private static HBaseTestingUtility testingUtility;
    private static Admin admin;

    @BeforeAll
    public static void setup() throws Exception {
        testingUtility = new HBaseTestingUtility();
        testingUtility.startMiniCluster();
        admin = testingUtility.getAdmin();
    }

    @AfterAll
    public static void teardown() throws Exception {
        testingUtility.shutdownMiniCluster();
    }

    @BeforeEach
    public void cleanupTables() throws IOException {
        // 每个测试用例开始前,确保环境干净
        TableName[] tableNames = admin.listTableNames();
        for (TableName tableName : tableNames) {
            admin.disableTable(tableName);
            admin.deleteTable(tableName);
        }
    }

    /**
     * 测试场景: 在一个已存在的表上,新增一个列族。
     * 这是最常见的兼容性变更。
     * TDD Cycle: Red -> Green -> Refactor
     * 1. Red: 先写这个测试,此时 applyChange 方法还没实现,肯定是失败的。
     * 2. Green: 实现 applyChange 逻辑,让测试通过。
     * 3. Refactor: 重构代码。
     */
    @Test
    @DisplayName("TDD: Should successfully add a new column family to an existing table")
    void testAddColumnFamily() throws IOException {
        // 1. Arrange: 准备一个已存在的表状态
        TableName tableName = TableName.valueOf("data-engineering:user_profiles");
        TableDescriptorBuilder tableDescBuilder = TableDescriptorBuilder.newBuilder(tableName)
            .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder("info".getBytes()).build());
        admin.createTable(tableDescBuilder.build());

        // 2. Act: 加载新的 YAML 定义 (模拟),这个定义比现有的多一个 'metrics' 列族
        HBaseTable newState = loadStateFromYaml("path/to/new_user_profiles_with_metrics.yaml");
        HBaseSchemaValidator validator = new HBaseSchemaValidator(admin);

        // applyChange 是我们要实现的核心方法
        validator.applyChange(newState);

        // 3. Assert: 验证表的最终状态是否符合预期
        assertTrue(admin.tableExists(tableName));
        var tableDescriptor = admin.getDescriptor(tableName);
        assertNotNull(tableDescriptor.getColumnFamily("info".getBytes()), "Original CF 'info' should exist");
        assertNotNull(tableDescriptor.getColumnFamily("metrics".getBytes()), "New CF 'metrics' should be added");
        assertEquals(2, tableDescriptor.getColumnFamilyCount());
    }

    /**
     * 测试场景: 尝试删除一个正在使用的列族。
     * 这是一个破坏性变更,验证器必须拒绝它。
     */
    @Test
    @DisplayName("TDD: Should throw exception when attempting to delete a column family")
    void testDeleteColumnFamily() throws IOException {
        // 1. Arrange: 准备一个包含两个列族的表
        TableName tableName = TableName.valueOf("data-engineering:user_profiles");
        TableDescriptorBuilder tableDescBuilder = TableDescriptorBuilder.newBuilder(tableName)
            .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder("info".getBytes()).build())
            .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder("metrics".getBytes()).build());
        admin.createTable(tableDescBuilder.build());

        // 2. Act: 加载一个只包含 'info' 列族的 YAML,意味着 'metrics' 被删除
        HBaseTable newState = loadStateFromYaml("path/to/user_profiles_without_metrics.yaml");
        HBaseSchemaValidator validator = new HBaseSchemaValidator(admin);
        
        // 3. Assert: 验证 applyChange 方法会抛出预期的异常
        // 这里的关键在于,我们的策略是不允许直接删除列族,必须走特殊的人工审批流程。
        assertThrows(IncompatibleSchemaChangeException.class, () -> {
            validator.applyChange(newState);
        }, "Deleting a column family should be prohibited by the validator");
    }
    
    // ... 其他测试用例,例如:
    // - testModifyColumnFamilyProperties_Compatible()
    // - testModifyColumnFamilyProperties_Incompatible() // e.g., changing compression might require major compaction
    // - testAddAcl()
    // - testRemoveAcl()
    // - testModifyAcl()
    // - testInvalidYamlSyntax()
    
    // 这是一个辅助方法,用于从 YAML 文件加载并解析成我们的 Java 对象
    private HBaseTable loadStateFromYaml(String path) {
        // In a real implementation, this would use a library like Jackson or SnakeYAML
        // to parse the YAML file into an HBaseTable POJO.
        // For this example, we'll just mock the object creation.
        // ... implementation omitted for brevity ...
        return new HBaseTable(); // Placeholder
    }
}

这个测试套件是我们的质量门禁。在 CI pipeline 中,我们会拉取 master 分支的 YAML 定义作为“当前状态”,然后将 PR 中的 YAML 作为“目标状态”,在内存中的 MiniHBaseCluster 里进行模拟变更。只有当所有测试用 case 通过,PR 才允许被合并。

第三步:将验证后的状态打包为 OCI 工件

一旦 CI 通过,就意味着这个状态变更是安全的。现在我们将它打包成一个不可变的、有版本的工件。我们使用 oras CLI 工具来完成这个任务。

CI 脚本(例如在 .gitlab-ci.yml 或 GitHub Actions 中):

# .gitlab-ci.yml
# ... a portion of the CI/CD pipeline

# 假设之前的 test 阶段已经成功
package-and-push:
  stage: deploy
  image:
    name: oras-project/oras:latest
    entrypoint: [""]
  script:
    # 登录到我们的 OCI 仓库 (e.g., Harbor)
    - echo "$CI_REGISTRY_PASSWORD" | oras login $CI_REGISTRY -u $CI_REGISTRY_USER --password-stdin
    
    # 将所有相关的 YAML 文件打包
    # --config /dev/null:application/vnd.unknown.config.v1+json 是一个技巧,因为我们不需要一个 config 层
    # :application/vnd.hbase.schema.v1+yaml 是我们自定义的 mediaType,用于标识文件类型
    - >
      oras push "$CI_REGISTRY_IMAGE/hbase-schemas:$CI_COMMIT_TAG" \
      --config /dev/null:application/vnd.unknown.config.v1+json \
      schemas/user_profiles.yaml:application/vnd.hbase.schema.v1+yaml \
      schemas/activity_stream.yaml:application/vnd.hbase.schema.v1+yaml
      
  only:
    - tags # 只在打 tag 时触发,确保版本的语义化

执行后,我们的 OCI 仓库里就会有一个类似 my-registry.corp.com/data/hbase-schemas:v1.2.0 的工件。这个工件包含了该版本下所有 HBase 表的声明式定义。

第四步:配置 Flux CD 进行自动化应用

现在轮到 Flux CD 出场了。我们需要配置两个核心资源:OCIRepositoryKustomization

flux-system/hbase-schemas-source.yaml:

# 告诉 Flux CD 监控哪个 OCI 仓库
apiVersion: source.toolkit.fluxcd.io/v1beta2
kind: OCIRepository
metadata:
  name: hbase-schemas
  namespace: flux-system
spec:
  interval: 5m # 每5分钟检查一次新版本
  url: oci://my-registry.corp.com/data/hbase-schemas
  ref:
    semver: ">=1.0.0" # 使用 SemVer 范围来自动拉取最新的稳定版

data-engineering/hbase-reconciler.yaml:

# 这个 Kustomization 会应用 OCI 工件中的内容
apiVersion: kustomize.toolkit.fluxcd.io/v1beta2
kind: Kustomization
metadata:
  name: hbase-state-apply
  namespace: data-engineering
spec:
  interval: 10m
  prune: true # 自动清理由这个 Kustomization 创建的旧资源 (比如上一个版本的 Job)
  sourceRef:
    kind: OCIRepository
    name: hbase-schemas
    namespace: flux-system
  # 这里的 path 指向工件内部,我们假设所有 YAML 都在根目录
  path: "./"
  # 这里是关键,我们设置了 health checks
  # Flux 会等待 Job 成功完成,否则 Kustomization 会处于 NotReady 状态
  healthChecks:
    - apiVersion: batch/v1
      kind: Job
      name: hbase-reconciler-job
      namespace: data-engineering

但是,OCI 工件里的 HBaseTable YAML 并不是 Kubernetes 原生资源,Kustomization 控制器直接应用会报错。所以,工件里不能直接放 HBaseTable YAML。我们需要放一个能被 Kubernetes 理解的资源,比如一个 Job

修改打包逻辑,我们不在 OCI 工件里放 HBaseTable YAML,而是放一个 Job 的模板,并将 HBaseTable 的内容作为 ConfigMap 打包进去。

修改后的 OCI 工件内容结构:

/
├── job-reconciler.yaml
└── schemas/
    ├── user_profiles.yaml
    └── ...

oras 推送命令变为:

oras push "$CI_REGISTRY_IMAGE/hbase-schemas:$CI_COMMIT_TAG" \
  --config /dev/null:application/vnd.unknown.config.v1+json \
  job/reconciler-job.yaml:application/vnd.kubernetes.v1+yaml \
  schemas/user_profiles.yaml:application/vnd.hbase.schema.v1+yaml \
  # ... other schemas

然后,Flux 的 Kustomization 只需要应用 job/ 路径下的内容。

job/reconciler-job.yaml:

apiVersion: batch/v1
kind: Job
metadata:
  # 使用 OCI 工件的版本号作为 Job 名称的一部分,确保每次都是新 Job
  name: hbase-reconciler-job-${OCI_VERSION} 
  namespace: data-engineering
  annotations:
    # 这一点很重要,告诉 Flux 这是来自哪个源的,用于垃圾回收
    kustomize.toolkit.fluxcd.io/prune: enabled
spec:
  template:
    spec:
      containers:
      - name: hbase-reconciler
        # 我们自己构建的 reconciler 镜像,包含了 HBase client 和解析逻辑
        image: my-registry.corp.com/tools/hbase-reconciler:latest
        env:
        - name: OCI_SOURCE_URL
          # Flux 会自动注入这些变量,让 reconciler 知道去哪里下载完整的工件内容
          value: ${OCI_SOURCE_URL}
        - name: OCI_SOURCE_REVISION
          value: ${OCI_SOURCE_REVISION}
        # 这里需要配置 ServiceAccount 和 Secret 来访问 HBase 集群
        # ...
      restartPolicy: Never
  backoffLimit: 1

这个 Job 启动后,内部的 reconciler 脚本/程序会:

  1. 从环境变量拿到 OCI 工件的 URL 和版本。
  2. 使用 oras 或其他库将完整的工件内容拉取到 Pod 内部。
  3. 遍历 schemas/ 目录下的所有 HBaseTable YAML 文件。
  4. 连接到 HBase 集群,获取表的当前状态。
  5. 计算当前状态和期望状态的差异 (diff)。
  6. 生成并执行必要的 HBase API 调用(admin.createTable, admin.modifyTable, grant 等)来弥合差异。
  7. 所有操作成功,Job 正常退出 (Exit Code 0)。任何失败,Job 失败退出,Flux 会报告 Kustomization 同步失败,并触发告警。

方案的局限性与未来展望

这个基于 TDD、OCI 和 Flux CD 的方案,极大地提升了我们管理 HBase 状态的可靠性和自动化程度。所有的变更都有记录、可审计、经过测试。然而,它并非银弹。

首先,当前的 Reconciler 是在一个一次性的 Job 中运行的,它缺乏一个持续调谐循环。如果有人绕过此流程手动修改了 HBase,系统不会自动纠正,直到下一次 OCI 工件更新。一个完整的 Kubernetes Operator,带有自己的 CRD (HBaseTable) 和一个常驻的 controller,将是最终的演进方向。这个 controller 可以定期检查 HBase 的实际状态,确保它始终与 Git/OCI 中声明的状态一致。

其次,该方案主要解决了 schema 和 ACL 的元数据管理,但没有触及更复杂的数据迁移问题。例如,当一个列族被重命名或数据格式需要变更时,需要同步进行数据重写。这通常需要更复杂的、与业务逻辑紧密耦合的工作流,比如运行 Spark 或 Flink 作业。当前这套流程可以作为触发这类复杂工作流的起点,但不能完全覆盖其生命周期。

最后,TDD 测试套件的完备性至关重要。任何未被测试覆盖的边缘变更场景,都可能成为潜在的风险点。维护和扩充这个测试套件,使其能覆盖更多罕见但致命的兼容性问题,将是一项持续的投入。


  目录