构建统一数据血缘平台:整合 Trino 查询日志、ArangoDB 图模型与 Elasticsearch 索引


在一个成熟的数据平台中,最令人恐惧的问题不是查询性能,也不是存储成本,而是“我无法确定修改这张表的这个字段会影响下游哪些报表和应用”。数据资产间的依赖关系模糊不清,形成一张看不见的、错综复杂的网,任何微小的改动都可能引发雪崩式的故障。这就是数据血缘(Data Lineage)缺失导致的典型困境。我们需要构建一个系统,不仅能回答“这张表从哪里来”,更能精确回答“它将到哪里去”。

定义问题:元数据管理的双重挑战

数据血缘的核心是关系,而元数据管理则面临两个截然不同的挑战:

  1. 关系查询(Traversal): 高效地进行多层、不定深度的依赖追溯。例如,从一个数据湖的原始分区表出发,找到所有直接或间接依赖它的 BI 报表、机器学习特征,甚至是某个特定 UI 组件中使用的数据集。这种查询是典型的图遍历。
  2. 内容检索(Search): 在海量的元数据中快速进行全文搜索。例如,根据一个业务术语(如“用户活跃度”)、字段注释、或者表负责人,找到所有相关的数据资产。这是搜索引擎的专长。

许多团队试图用单一数据库解决这两个问题,这往往是架构灾难的开始。

方案A:万物皆索引——Elasticsearch-Only 架构

一个看似直接的方案是将所有元数据,包括血缘关系,都建模成 JSON 文档存入 Elasticsearch。

  • 模型设计:

    • 一个 assets 索引,每个文档代表一个数据资产(表、字段、仪表盘等)。
    • 每个文档包含 upstream_assetsdownstream_assets 数组,直接存储其一级依赖关系。
  • 优势:

    • 实现简单,技术栈统一。
    • 全文检索能力强大,对于元数据的内容发现非常友好。
  • 致命缺陷:

    • 深度遍历的性能灾难: 查询一个表的所有下游依赖,需要先获取其 downstream_assets,然后以这些 ID 为参数发起第二次查询,以此类推。当血缘深度达到 5 层以上时,查询次数呈指数级增长,网络开销和查询延迟变得无法接受。在真实项目中,这种“N+1”查询模式很快会拖垮系统。
    • 关系更新的复杂性: 当一个 ETL 任务发生变更,例如增加了一个上游输入表,你需要同时更新这个任务节点、新的上游表节点、以及所有下游节点的文档。这使得事务控制变得异常复杂,极易出现数据不一致。
    • 模型冗余: 关系数据被冗余地存储在多个文档中,增加了存储成本和更新的复杂度。

这个方案在血缘关系简单、深度较浅的场景下或许可用,但对于复杂的现代数据仓库和数据湖环境,它完全无法胜任。

方案B:万物皆图——ArangoDB-Only 架构

另一个极端是完全拥抱图数据库,例如 ArangoDB。

  • 模型设计:

    • 使用顶点集合(Vertex Collections)存储数据资产,如 tables, columns, jobs
    • 使用边集合(Edge Collections)存储关系,如 reads_from, writes_to
  • 优势:

    • 原生图遍历: 血缘查询是其核心优势。使用 AQL (ArangoDB Query Language) 进行任意深度的遍历查询,性能极高且语法直观。
    • 模型清晰: 数据模型与业务概念(资产与依赖)完全匹配,易于理解和维护。
  • 缺陷:

    • 全文检索能力受限: 虽然 ArangoDB 提供了 ArangoSearch 视图,支持一定的全文检索能力,但其功能、性能和生态成熟度与专用的搜索引擎 Elasticsearch 相比仍有差距。在高并发、复杂查询条件的元数据检索场景下,可能会成为瓶颈。
    • 运维复杂性: 对于习惯了关系型数据库或文档数据库的团队,引入图数据库的学习曲线和运维成本相对较高。

在真实项目中,我们发现用户不仅需要看血缘图,更需要一个强大的入口去“发现”他们关心的资产,而这个发现过程往往依赖于模糊的、非结构化的文本搜索。

最终架构选择:ArangoDB + Elasticsearch 混合模式

我们最终选择的架构,是两种方案的结合,扬长避短。

  • ArangoDB: 作为数据血缘图的**唯一事实来源 (Source of Truth)**。它负责存储所有资产节点和它们之间的依赖关系边。所有关于结构和关系的查询,都由 ArangoDB 处理。
  • Elasticsearch: 作为元数据的全文检索引擎和查询缓存。它存储每个资产的丰富元数据(描述、标签、负责人、字段信息等)的扁平化文档。它不存储关系,只为了快速检索。
  • Trino: 作为血缘信息的关键生产者。通过其 EventListener 插件,我们可以捕获每一次查询完成的事件,从而获得最准确、最及时的表级乃至列级血缘。
  • 统一 API 层: 提供 GraphQL 或 RESTful API,根据请求类型,智能地路由到 ArangoDB(图遍历)或 Elasticsearch(文本搜索)。
  • 专用 UI 组件库: 前端使用定制化的组件库,一个组件负责调用 API 获取图数据并使用 D3.js 或类似库进行可视化,另一个组件负责实现强大的搜索框,与 Elasticsearch API 交互。
graph TD
    subgraph 数据源
        A[数据湖/数仓 HDFS/S3]
    end

    subgraph 查询与处理
        B(Trino集群) -- 查询 --> A
        B -- QueryCompletedEvent --> C{EventListener}
    end

    subgraph 血缘处理管道
        C -- 事件 --> D[Kafka/Pulsar]
        E[Lineage Parser Service] -- 消费 --> D
        E -- 解析SQL --> F{识别源/目标表}
    end

    subgraph 统一元数据存储
        F -- 写入图关系 --> G[ArangoDB]
        F -- 写入/更新元数据文档 --> H[Elasticsearch]
    end

    subgraph 应用层
        I[Unified Metadata API] -- 图查询 --> G
        I -- 文本搜索 --> H
        J[专用UI组件库] -- API调用 --> I
    end

    subgraph 用户
        K(数据分析师/工程师) -- 使用 --> J
    end

这种架构的核心思想是“关注点分离”。图数据库做它最擅长的事,搜索引擎也一样。数据一致性通过事件驱动的管道来保证,从 Trino 捕获的事件是驱动元数据更新的唯一入口。

核心实现概览

1. Trino EventListener

我们需要实现 Trino 的 EventListener 接口,以捕获查询元数据。这是一个生产级的 Java 代码骨架。

// pom.xml dependency
/*
<dependency>
    <groupId>io.trino</groupId>
    <artifactId>trino-spi</artifactId>
    <version>${trino.version}</version>
    <scope>provided</scope>
</dependency>
*/

package io.mycompany.trino.plugin.eventlistener;

import io.trino.spi.eventlistener.EventListener;
import io.trino.spi.eventlistener.QueryCompletedEvent;
import io.trino.spi.eventlistener.QueryContext;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class LineageKafkaEventListener implements EventListener {
    private static final Logger log = LogManager.getLogger(LineageKafkaEventListener.class);
    private final KafkaProducer<String, String> producer;
    private final String topic;
    // Use a dedicated thread pool to avoid blocking Trino's event processing thread
    private final ExecutorService executor = Executors.newFixedThreadPool(5);

    public LineageKafkaEventListener(Map<String, String> config) {
        this.topic = config.getOrDefault("lineage.kafka.topic", "trino-lineage-events");
        String bootstrapServers = config.get("lineage.kafka.bootstrap-servers");
        if (bootstrapServers == null) {
            throw new IllegalArgumentException("lineage.kafka.bootstrap-servers is required");
        }
        
        Properties props = new Properties();
        props.put("bootstrap.servers", bootstrapServers);
        props.put("acks", "all");
        props.put("retries", 3);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        
        this.producer = new KafkaProducer<>(props);
        log.info("LineageKafkaEventListener initialized for topic: {}", this.topic);
    }

    @Override
    public void queryCompleted(QueryCompletedEvent queryCompletedEvent) {
        // We only care about successful queries that actually modify or read data
        if (queryCompletedEvent.getIoMetadata().getOutput().isEmpty() && 
            queryCompletedEvent.getIoMetadata().getInputs().isEmpty()) {
            return;
        }
        
        // Asynchronously process the event
        executor.submit(() -> {
            try {
                String eventJson = buildEventJson(queryCompletedEvent);
                ProducerRecord<String, String> record = new ProducerRecord<>(
                    topic, 
                    queryCompletedEvent.getMetadata().getQueryId(), // Use query ID as key for partitioning
                    eventJson
                );
                producer.send(record, (metadata, exception) -> {
                    if (exception != null) {
                        log.error("Failed to send lineage event to Kafka for query {}", 
                                  queryCompletedEvent.getMetadata().getQueryId(), exception);
                    }
                });
            } catch (Exception e) {
                log.error("Error processing query completed event: {}", 
                          queryCompletedEvent.getMetadata().getQueryId(), e);
            }
        });
    }
    
    private String buildEventJson(QueryCompletedEvent event) {
        // Using a proper JSON library like Jackson or Gson is recommended in production
        // This is a simplified example for clarity
        QueryContext context = event.getContext();
        String sql = event.getMetadata().getQuery();
        // A common mistake is sending the full SQL. Sanitize it to remove sensitive data if needed.
        sql = sql.replace("\"", "\\\"").replace("\n", "\\n");

        return String.format("{\"queryId\": \"%s\", \"user\": \"%s\", \"source\": \"%s\", \"sql\": \"%s\", \"createTime\": %d, \"endTime\": %d, \"outputTable\": \"%s\"}",
            event.getMetadata().getQueryId(),
            context.getUser(),
            context.getSource().orElse("unknown"),
            sql,
            event.getCreateTime().toEpochMilli(),
            event.getEndTime().toEpochMilli(),
            event.getIoMetadata().getOutput().map(Object::toString).orElse("null")
        );
    }

    @Override
    public void shutdown() {
        executor.shutdown();
        producer.close();
        log.info("LineageKafkaEventListener shut down.");
    }
}

2. 血缘解析服务 (Lineage Parser Service)

这个服务消费 Kafka 消息,使用一个 SQL 解析库(如 Python 的 sqllineage)来提取血缘。

# requirements.txt
# sqllineage
# kafka-python
# arango
# elasticsearch

import json
from kafka import KafkaConsumer
from sqllineage.runner import LineageRunner
from arango import ArangoClient
from elasticsearch import Elasticsearch
import logging

# --- Configuration ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
KAFKA_TOPIC = 'trino-lineage-events'
KAFKA_SERVERS = ['kafka:9092']
ARANGO_HOST = 'http://arango:8529'
ARANGO_USER = 'root'
ARANGO_PASS = 'password'
ES_HOST = 'elasticsearch:9200'

# --- Client Initialization ---
# In a real application, handle connections more robustly
try:
    arango_client = ArangoClient(hosts=ARANGO_HOST)
    db = arango_client.db('lineage_db', username=ARANGO_USER, password=ARANGO_PASS)
    # Ensure collections exist
    if not db.has_collection('assets'):
        db.create_collection('assets', edge=False)
    if not db.has_collection('lineage_edge'):
        db.create_collection('lineage_edge', edge=True)
    assets_collection = db.collection('assets')
    lineage_edge_collection = db.collection('lineage_edge')
    
    es_client = Elasticsearch([ES_HOST])
except Exception as e:
    logging.critical(f"Failed to connect to databases: {e}")
    exit(1)


def process_message(msg):
    """
    Parses a lineage event, and updates ArangoDB and Elasticsearch.
    This function must be idempotent.
    """
    try:
        event = json.loads(msg.value.decode('utf-8'))
        sql = event.get('sql')
        if not sql:
            return

        # Use sqllineage to parse dependencies
        result = LineageRunner(sql)
        
        # We only handle simple cases here. Real-world SQL requires more complex logic.
        if not result.target_tables:
            return

        target_table_str = str(list(result.target_tables)[0])
        target_node_key = target_table_str.replace('.', '_')
        
        # 1. Update ArangoDB Graph
        # Upsert target node
        if not assets_collection.get(target_node_key):
            assets_collection.insert({'_key': target_node_key, 'name': target_table_str, 'type': 'TABLE'})
            logging.info(f"Created node: {target_node_key}")

        for source_table in result.source_tables:
            source_table_str = str(source_table)
            source_node_key = source_table_str.replace('.', '_')

            # Upsert source node
            if not assets_collection.get(source_node_key):
                assets_collection.insert({'_key': source_node_key, 'name': source_table_str, 'type': 'TABLE'})
                logging.info(f"Created node: {source_node_key}")

            # Upsert edge
            edge = {
                '_from': f'assets/{source_node_key}',
                '_to': f'assets/{target_node_key}',
                'query_id': event['queryId'],
                'updated_at': event['endTime']
            }
            # Use query_id to make edge updates idempotent
            edge_key = f"{source_node_key}_{target_node_key}_{event['queryId']}"
            if not lineage_edge_collection.get(edge_key):
                edge['_key'] = edge_key
                lineage_edge_collection.insert(edge)
                logging.info(f"Created edge from {source_node_key} to {target_node_key}")

        # 2. Update Elasticsearch Document
        # In a production system, you'd fetch more metadata from a metastore (like Hive Metastore).
        doc = {
            'name': target_table_str,
            'type': 'TABLE',
            'last_updated_user': event['user'],
            'last_updated_time': event['endTime'],
            'tags': ['trino_generated'],
            'description': f"This table was generated by query {event['queryId']}."
        }
        es_client.index(index='assets_metadata', id=target_node_key, body=doc)
        logging.info(f"Indexed document for: {target_node_key}")

    except Exception as e:
        logging.error(f"Failed to process message: {msg.value}. Error: {e}")
        # Here you should have a dead-letter queue strategy

def main():
    consumer = KafkaConsumer(
        KAFKA_TOPIC,
        bootstrap_servers=KAFKA_SERVERS,
        auto_offset_reset='earliest',
        group_id='lineage_parser_group'
    )
    logging.info("Consumer started...")
    for message in consumer:
        process_message(message)

if __name__ == '__main__':
    main()

3. ArangoDB 血缘查询

一旦数据进入 ArangoDB,复杂的血缘追溯就变得非常简单。

查询 hive.prod.fact_sales 的所有直接和间接上游依赖(最多10层):

// AQL Query
WITH assets
FOR v IN 1..10 INBOUND 'assets/hive_prod_fact_sales' lineage_edge
  OPTIONS { uniqueVertices: 'path' }
  RETURN {
    name: v.name,
    type: v.type
  }

查询 hive.staging.user_events 影响的所有下游资产:

// AQL Query
WITH assets
FOR v, e, p IN 1..10 OUTBOUND 'assets/hive_staging_user_events' lineage_edge
  RETURN {
    asset: {
        name: v.name,
        type: v.type
    },
    path_length: LENGTH(p.vertices) - 1
  }

这里的性能是传统数据库通过递归 CTE 或多次应用层 join 无法比拟的。

架构的扩展性与局限性

此架构的优势在于其模块化和可扩展性。我们可以轻松地为 Spark、dbt 或其他数据处理引擎编写新的事件源和解析器,将它们的血缘信息同样注入到这个统一的平台中。API 层和 UI 组件库保持不变,因为它们只与底层的 ArangoDB 和 Elasticsearch 交互。

然而,这个方案并非没有挑战。

  1. SQL 解析的复杂性: sqllineage 库能处理大部分标准 SQL,但对于包含复杂 UDF、动态 SQL 或特定方言的查询,解析可能会失败或不准确。一个生产级系统需要一个持续迭代和优化的解析引擎,甚至可能需要引入人工标注和修正机制。
  2. 数据一致性: ArangoDB 和 Elasticsearch 是两个独立的系统。虽然我们通过单一事件源来保证写入顺序,但在系统异常时仍可能出现两者数据不一致的情况。需要建立定期的对账和修复机制。
  3. 列级血缘的成本: 本文的实现是表级的。要实现列级血缘,解析逻辑会复杂几个数量级,图中的节点和边的数量也会爆炸式增长,对存储和查询性能都提出了更高的要求。这是需要仔细评估投入产出比的。
  4. 运维开销: 维护一个包含 Kafka、Trino、ArangoDB 和 Elasticsearch 的复杂系统,对 DevOps 团队的能力要求很高。每个组件都需要专业的监控、备份和性能调优。

最终,构建这样一个平台不是一个一次性的项目,而是一个持续演进的过程。它解决的是数据平台最核心的治理和可发现性问题,其价值会随着数据资产的增长而愈发凸显。


  目录