基于 Scikit-learn 和 Prometheus 构建可测试的实时指标异常检测服务


团队维护的 Prometheus 告警规则最近正以惊人的速度膨胀。最初基于静态阈值的规则简单有效,但随着系统复杂性增加,我们开始遇到瓶颈。cpu_usage > 90% 这样的规则无法捕捉到那些多指标之间微妙的、非线性的关联异常。例如,某个服务的 CPU 使用率可能只有 60%,但其响应延迟和队列深度却在以非正常模式同步增长。这种问题,静态阈值束手无策。

引入机器学习模型进行异常检测似乎是顺理成章的下一步。然而,一个常见的错误是将 ML 项目仅仅视为数据科学问题,忽略了其作为生产环境服务的工程健壮性。一个无法被有效测试、监控和维护的 ML 服务,其带来的风险远大于它解决的问题。这次复盘,记录了我们如何从零开始,构建一个生产级的、以单元测试为核心的、消费 Prometheus 指标的 Scikit-learn 异常检测服务。

第一版原型:能跑,但仅此而已

最初的构想非常直接:一个 Python 脚本,定期通过 Prometheus HTTP API 拉取数据,用预训练好的 IsolationForest 模型打分,然后将异常点打印到日志。

# WARNING: 这是个反面教材,请勿在生产中使用
import requests
import time
import pickle
import numpy as np

PROMETHEUS_URL = 'http://localhost:9090/api/v1/query'
QUERY = 'sum(rate(node_cpu_seconds_total{mode="idle"}[5m])) by (instance)'
MODEL_PATH = 'isolation_forest_model.pkl'

# 加载一个预训练好的模型
with open(MODEL_PATH, 'rb') as f:
    model = pickle.load(f)

def fetch_and_predict():
    try:
        response = requests.get(PROMETHEUS_URL, params={'query': QUERY})
        response.raise_for_status()
        results = response.json()['data']['result']
        
        for result in results:
            # 简化处理,只取值
            value = float(result['value'][1])
            prediction = model.predict(np.array([[value]]))
            
            if prediction[0] == -1: # -1 代表异常
                instance = result['metric']['instance']
                print(f"异常检测! Instance: {instance}, Value: {value}")

    except requests.RequestException as e:
        print(f"查询 Prometheus 失败: {e}")
    except Exception as e:
        print(f"处理数据失败: {e}")

if __name__ == '__main__':
    while True:
        fetch_and_predict()
        time.sleep(60)

这个脚本能工作。但在真实项目中,它几乎是不可维护的。

  1. 强耦合: 网络请求、数据解析、模型预测和结果输出全部耦合在一个函数里。
  2. 不可测试: 如何测试 fetch_and_predict?我们必须启动一个真实的 Prometheus 实例,并确保其中有特定数据。网络抖动、Prometheus 宕机都会导致测试失败,这违背了单元测试稳定、快速、隔离的原则。
  3. 状态缺失: 它只是打印日志,我们无法从外部观测其运行状态。它上次成功查询是什么时候?处理了多少条时间序列?预测时有没有出错?这些都无从得知。
  4. 配置硬编码: 所有配置都写死在代码里。

这个原型让我们明确了痛点:我们需要的是一个服务,而不是一个脚本。一个遵循“关注点分离”原则、具备良好可测试性和可观测性的服务。

架构重新设计:面向测试和服务化

我们的目标是构建一个可以独立部署和监控的微服务。其核心设计思想是分层和解耦。

graph TD
    subgraph Anomaly Detection Service
        A[Service Runner] --> B{Prediction Loop};
        B --> C[Prometheus Client];
        B --> D[Model Wrapper];
        B --> E[Metrics Exporter];
        C -- Fetches Metrics --> F[(Prometheus API)];
        D -- Loads Model --> G[(Model File)];
        E -- Exposes /metrics --> H[(Prometheus Scraper)];
    end

    subgraph Testing Environment
        I[Pytest Runner] --> J[TestPredictionService];
        J -- Mocks --> K[Mock Prometheus Client];
        J -- Mocks --> L[Mock Model Wrapper];
        J -- Injects Mocks --> B;
        J -- Asserts on --> E;
    end

    style Testing Environment fill:#f9f,stroke:#333,stroke-width:2px

我们将服务拆分为几个关键组件:

  • PrometheusClient: 专职负责与 Prometheus API 交互。它封装了网络请求、认证(如果需要)、错误处理和响应解析。它的唯一职责就是根据查询返回结构化的数据。
  • ModelWrapper: 封装 Scikit-learn 模型。它负责加载模型文件、数据预处理(例如,特征缩放)、执行预测,并返回一个标准化的结果。这层抽象让我们未来可以轻松替换成 XGBoostPyTorch 模型,而无需改动上层业务逻辑。
  • MetricsExporter: 使用 prometheus_client 库,将服务自身的状态和模型的预测结果以 Prometheus 指标的格式暴露出去。这是实现“对监控系统进行监控”的关键。
  • PredictionService: 业务逻辑的核心,负责编排上述组件。它驱动整个预测流程:调用 PrometheusClient 获取数据,传递给 ModelWrapper 进行预测,最后通过 MetricsExporter 更新暴露的指标。

这种设计的最大优势在于可测试性。在单元测试中,我们可以轻易地用 Mock 对象替换 PrometheusClientModelWrapper,从而在完全离线的环境中,精准地测试 PredictionService 的业务逻辑。

核心组件实现与生产级考量

1. PrometheusClient:健壮的数据源

这个客户端不仅仅是一个 requests.get 的封装。在真实项目中,我们需要考虑超时、重试、以及对 Prometheus 返回各种数据格式的健壮解析。

# service/prometheus_client.py
import requests
import logging
from typing import List, Dict, Any

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

class PrometheusClient:
    """
    一个健壮的 Prometheus API 客户端,负责查询和解析数据。
    """
    def __init__(self, api_url: str, timeout: int = 10):
        if not api_url.endswith('/api/v1'):
            # 避免常见的配置错误
            self.api_url = f"{api_url.rstrip('/')}/api/v1"
        else:
            self.api_url = api_url
        self.timeout = timeout
        self.query_url = f"{self.api_url}/query"

    def query(self, promql: str) -> List[Dict[str, Any]]:
        """
        执行 PromQL 查询并返回结果列表。
        :param promql: 要执行的 PromQL 表达式。
        :return: 一个包含时间序列数据的字典列表。
                 例如: [{'metric': {'label1': 'value1'}, 'value': [timestamp, 'value_str']}]
        :raises ConnectionError: 如果网络请求失败。
        :raises ValueError: 如果 Prometheus 返回非成功状态或数据格式不正确。
        """
        logging.info(f"正在执行查询: {promql}")
        try:
            response = requests.get(
                self.query_url,
                params={'query': promql},
                timeout=self.timeout
            )
            response.raise_for_status()  # 如果状态码不是 2xx,则抛出 HTTPError

            data = response.json()
            if data.get('status') != 'success':
                error_type = data.get('errorType', 'Unknown')
                error_msg = data.get('error', 'No error message provided')
                logging.error(f"Prometheus 查询失败: {error_type} - {error_msg}")
                raise ValueError(f"Prometheus query failed: {error_type} - {error_msg}")

            return data.get('data', {}).get('result', [])

        except requests.exceptions.RequestException as e:
            logging.error(f"连接 Prometheus 失败: {e}")
            raise ConnectionError(f"Failed to connect to Prometheus at {self.api_url}") from e
        except (KeyError, IndexError, ValueError) as e:
            logging.error(f"解析 Prometheus 响应失败: {e}")
            raise ValueError("Invalid response format from Prometheus") from e

这里的坑在于:必须处理 status 不为 success 的情况,并记录下 errorTypeerror,这对于调试 PromQL 语法错误至关重要。

2. ModelWrapper:隔离机器学习的复杂性

ModelWrapper 的职责是让业务代码感觉不到 Scikit-learn 的存在。它提供一个干净的 predict 接口。

# service/model_wrapper.py
import pickle
import numpy as np
import logging
from typing import List

class ModelWrapper:
    """
    封装 Scikit-learn 模型,提供加载和预测的接口。
    """
    def __init__(self, model_path: str):
        self.model_path = model_path
        self._model = None
        self.load_model()

    def load_model(self):
        """
        从文件加载模型。如果失败,服务将无法启动。
        """
        try:
            logging.info(f"正在从 {self.model_path} 加载模型...")
            with open(self.model_path, 'rb') as f:
                self._model = pickle.load(f)
            logging.info("模型加载成功。")
        except FileNotFoundError:
            logging.error(f"模型文件未找到: {self.model_path}")
            raise
        except (pickle.UnpicklingError, EOFError) as e:
            logging.error(f"加载模型文件失败,文件可能已损坏: {e}")
            raise

    def predict(self, features: List[List[float]]) -> np.ndarray:
        """
        使用加载的模型进行预测。
        :param features: 一个二维列表,每行代表一个样本的特征。
        :return: 一个 numpy 数组,包含每个样本的预测结果 (-1 for anomaly, 1 for normal)。
        """
        if not features:
            return np.array([])
            
        if self._model is None:
            logging.error("模型未加载,无法进行预测。")
            raise RuntimeError("Model is not loaded.")

        try:
            # 转换为 numpy 数组,这是 scikit-learn 的标准输入格式
            feature_array = np.array(features)
            # 在真实项目中,这里可能还需要进行特征缩放等预处理
            return self._model.predict(feature_array)
        except Exception as e:
            logging.error(f"模型预测时发生错误: {e}")
            # 避免因单个预测失败而导致整个服务崩溃
            # 返回一个空数组,上层逻辑需要处理这种情况
            return np.array([])

一个关键点是在 __init__ 中直接调用 load_model。这意味着如果模型文件有问题,服务在启动时就会失败。这是一种“快速失败”的设计哲学,避免服务在运行时才发现这个致命错误。

3. MetricsExporter:暴露内部状态

这是服务的“仪表盘”。我们不仅要暴露模型的预测结果,更要暴露服务自身的健康状况。

# service/metrics_exporter.py
from prometheus_client import Gauge, Counter, Histogram
import logging

class MetricsExporter:
    """
    管理所有需要暴露给 Prometheus 的指标。
    """
    def __init__(self, service_name: str = "anomaly_detection_service"):
        # 业务指标: 每个时间序列的异常得分
        self.anomaly_score = Gauge(
            f'{service_name}_anomaly_score',
            'Anomaly score for a given time series. -1 for anomaly, 1 for normal.',
            ['instance', 'job'] # 这里的标签应该与被监控的指标标签保持一致
        )
        # 服务健康指标
        self.predictions_total = Counter(
            f'{service_name}_predictions_total',
            'Total number of predictions made.'
        )
        self.errors_total = Counter(
            f'{service_name}_errors_total',
            'Total number of errors encountered.',
            ['error_type'] # 按错误类型分类,例如 'prometheus_query', 'model_predict'
        )
        self.last_successful_run_timestamp = Gauge(
            f'{service_name}_last_successful_run_timestamp',
            'Timestamp of the last successful prediction run.'
        )
        self.prediction_latency = Histogram(
            f'{service_name}_prediction_latency_seconds',
            'Latency of the prediction loop in seconds.'
        )
        logging.info("MetricsExporter 初始化完成。")

    def report_anomaly_score(self, labels: dict, score: int):
        """
        报告一个时间序列的异常分数。
        :param labels: 包含该时间序列所有标签的字典。
        :param score: 模型的预测结果 (-1 或 1)。
        """
        # .labels() 方法会返回一个特定标签组合的子指标
        self.anomaly_score.labels(**labels).set(score)

    def increment_error(self, error_type: str):
        self.errors_total.labels(error_type=error_type).inc()
        
    def increment_prediction(self):
        self.predictions_total.inc()

anomaly_score 使用 Gauge 类型,因为它会上下波动。predictions_totalerrors_total 使用 Counter,因为它们是只增不减的计数器。使用 Histogram 来度量 prediction_latency,可以让我们后续计算分位数(如 p99 延迟),比简单的 GaugeSummary 提供更多信息。

单元测试:质量保障的核心

现在,所有组件都已解耦,我们可以为它们编写隔离的、可靠的单元测试了。我们将使用 pytestunittest.mock

测试 PrometheusClient

我们需要模拟 requests.get 的行为,包括成功、HTTP 错误和返回格式错误的情况。

# tests/test_prometheus_client.py
import pytest
from unittest.mock import patch, MagicMock
from service.prometheus_client import PrometheusClient

@pytest.fixture
def client():
    return PrometheusClient(api_url="http://fake-prometheus:9090")

# 模拟一个成功的 Prometheus API 响应
MOCK_SUCCESS_RESPONSE = {
    "status": "success",
    "data": {
        "resultType": "vector",
        "result": [
            {
                "metric": {"instance": "node1", "job": "node_exporter"},
                "value": [1672531200, "0.85"]
            }
        ]
    }
}

# 模拟一个查询失败的响应
MOCK_FAILURE_RESPONSE = {
    "status": "error",
    "errorType": "bad_data",
    "error": "invalid expression"
}

@patch('requests.get')
def test_query_success(mock_get, client):
    """测试成功的查询"""
    mock_response = MagicMock()
    mock_response.status_code = 200
    mock_response.json.return_value = MOCK_SUCCESS_RESPONSE
    mock_get.return_value = mock_response

    result = client.query("some_query")
    
    mock_get.assert_called_once_with(
        client.query_url,
        params={'query': 'some_query'},
        timeout=10
    )
    assert result == MOCK_SUCCESS_RESPONSE['data']['result']

@patch('requests.get')
def test_query_http_error(mock_get, client):
    """测试网络或HTTP错误"""
    mock_response = MagicMock()
    mock_response.raise_for_status.side_effect = requests.exceptions.HTTPError
    mock_get.return_value = mock_response

    with pytest.raises(ConnectionError):
        client.query("some_query")

@patch('requests.get')
def test_query_prometheus_error(mock_get, client):
    """测试 Prometheus 返回 status=error 的情况"""
    mock_response = MagicMock()
    mock_response.status_code = 200
    mock_response.json.return_value = MOCK_FAILURE_RESPONSE
    mock_get.return_value = mock_response

    with pytest.raises(ValueError, match="Prometheus query failed: bad_data"):
        client.query("some_query")

@patch('requests.get')
def test_query_invalid_json(mock_get, client):
    """测试返回的 JSON 格式不正确"""
    mock_response = MagicMock()
    mock_response.status_code = 200
    # 返回一个缺少 'data' 键的响应
    mock_response.json.return_value = {"status": "success"}
    mock_get.return_value = mock_response
    
    with pytest.raises(ValueError, match="Invalid response format from Prometheus"):
        client.query("some_query")

测试 PredictionService 的集成逻辑

这是最重要的测试。我们不关心底层模型或网络请求的真实性,只关心 PredictionService 是否能正确地调用它的依赖项,并根据它们的返回值更新指标。

# service/prediction_service.py
import time
import logging
from .prometheus_client import PrometheusClient
from .model_wrapper import ModelWrapper
from .metrics_exporter import MetricsExporter

class PredictionService:
    def __init__(
        self,
        client: PrometheusClient,
        model: ModelWrapper,
        exporter: MetricsExporter,
        query: str
    ):
        self.client = client
        self.model = model
        self.exporter = exporter
        self.query = query

    def run_once(self):
        logging.info("开始新一轮预测...")
        with self.exporter.prediction_latency.time():
            try:
                time_series = self.client.query(self.query)
                if not time_series:
                    logging.warning("查询返回空结果。")
                    return

                # 准备模型输入
                labels_list = []
                features_list = []
                for ts in time_series:
                    # 假设我们只使用值作为特征
                    # 真实场景可能更复杂,需要多个查询和特征工程
                    labels_list.append(ts.get('metric', {}))
                    features_list.append([float(ts['value'][1])])
                
                # 获取预测结果
                predictions = self.model.predict(features_list)
                
                # 上报指标
                for labels, prediction in zip(labels_list, predictions):
                    self.exporter.report_anomaly_score(labels, int(prediction))
                    self.exporter.increment_prediction()
                
                self.exporter.last_successful_run_timestamp.set_to_current_time()
                logging.info(f"预测完成,处理了 {len(time_series)} 条时间序列。")

            except ConnectionError:
                self.exporter.increment_error(error_type='prometheus_query')
            except ValueError as e:
                logging.error(f"数据处理或模型输入错误: {e}")
                self.exporter.increment_error(error_type='data_processing')
            except RuntimeError as e:
                logging.error(f"模型预测运行时错误: {e}")
                self.exporter.increment_error(error_type='model_predict')

对应的测试代码:

# tests/test_prediction_service.py
from unittest.mock import MagicMock
import numpy as np
from service.prediction_service import PredictionService

def test_run_once_success_flow():
    """测试 PredictionService 成功执行的完整流程"""
    # 1. 准备 Mock 对象
    mock_client = MagicMock(spec=PrometheusClient)
    mock_model = MagicMock(spec=ModelWrapper)
    mock_exporter = MagicMock() # 使用普通 MagicMock 更容易断言
    
    # 2. 设定 Mock 对象的行为
    mock_prometheus_response = [
        {'metric': {'instance': 'node1'}, 'value': [0, '10.0']},
        {'metric': {'instance': 'node2'}, 'value': [0, '100.0']},
    ]
    mock_client.query.return_value = mock_prometheus_response
    
    mock_model_prediction = np.array([1, -1]) # node1 正常, node2 异常
    mock_model.predict.return_value = mock_model_prediction

    # 3. 初始化被测服务
    service = PredictionService(mock_client, mock_model, mock_exporter, query="fake_query")
    
    # 4. 执行被测方法
    service.run_once()

    # 5. 断言
    # 确认依赖项被正确调用
    mock_client.query.assert_called_once_with("fake_query")
    # 确认模型接收了正确格式的特征
    # np.testing.assert_array_equal 用于比较 numpy 数组
    np.testing.assert_array_equal(
        mock_model.predict.call_args[0][0], 
        np.array([[10.0], [100.0]])
    )

    # 确认指标被正确上报
    assert mock_exporter.report_anomaly_score.call_count == 2
    mock_exporter.report_anomaly_score.assert_any_call({'instance': 'node1'}, 1)
    mock_exporter.report_anomaly_score.assert_any_call({'instance': 'node2'}, -1)

    assert mock_exporter.increment_prediction.call_count == 2
    mock_exporter.last_successful_run_timestamp.set_to_current_time.assert_called_once()
    mock_exporter.increment_error.assert_not_called()

def test_run_once_handles_query_error():
    """测试当 Prometheus 查询失败时,服务能正确处理并上报错误"""
    mock_client = MagicMock(spec=PrometheusClient)
    mock_model = MagicMock(spec=ModelWrapper)
    mock_exporter = MagicMock()
    
    mock_client.query.side_effect = ConnectionError("Connection failed")
    
    service = PredictionService(mock_client, mock_model, mock_exporter, query="fake_query")
    service.run_once()
    
    mock_model.predict.assert_not_called()
    mock_exporter.increment_error.assert_called_once_with(error_type='prometheus_query')
    mock_exporter.report_anomaly_score.assert_not_called()

这些测试覆盖了核心的业务逻辑,确保了数据在各个组件之间的正确流动和转换,以及在异常情况下的正确响应。它们完全不依赖于网络或文件系统,执行速度极快,可以轻松集成到 CI 流水线中。

服务的最终形态与局限性

最终,我们将所有组件组装在一个简单的 Web 服务器中,它只做一件事:暴露 /metrics 端点供 Prometheus 抓取。

# main.py
import os
import time
from prometheus_client import start_http_server
from service.prometheus_client import PrometheusClient
from service.model_wrapper import ModelWrapper
from service.metrics_exporter import MetricsExporter
from service.prediction_service import PredictionService

if __name__ == '__main__':
    # 从环境变量获取配置,这是云原生应用的最佳实践
    PROMETHEUS_URL = os.getenv("PROMETHEUS_URL", "http://localhost:9090")
    MODEL_PATH = os.getenv("MODEL_PATH", "model.pkl")
    QUERY = os.getenv("PROMQL_QUERY", 'vector(1)')
    LISTEN_PORT = int(os.getenv("LISTEN_PORT", 8000))
    SCRAPE_INTERVAL = int(os.getenv("SCRAPE_INTERVAL", 60))

    # 1. 初始化所有组件
    client = PrometheusClient(api_url=PROMETHEUS_URL)
    model = ModelWrapper(model_path=MODEL_PATH)
    exporter = MetricsExporter()
    service = PredictionService(
        client=client,
        model=model,
        exporter=exporter,
        query=QUERY
    )

    # 2. 启动 HTTP 服务器以暴露指标
    start_http_server(LISTEN_PORT)
    
    # 3. 运行主循环
    while True:
        service.run_once()
        time.sleep(SCRAPE_INTERVAL)

这个方案解决了我们最初的所有痛点。它是一个健壮、可测试、可观测的服务。然而,它并非没有局限性。当前的设计使用了一个静态的、预训练好的模型。在真实世界中,数据分布会随着时间漂移,模型性能会逐渐下降。一个完整的 MLOps 闭环需要包含一个自动化的模型再训练、验证和部署流水线。例如,我们可以定期从 Prometheus 导出历史数据作为训练集,训练新模型,在影子环境中与旧模型进行 A/B 测试,只有当新模型表现更优时,才将其部署到生产环境,替换掉旧的 model.pkl 文件。

此外,对于更复杂的异常模式,单一的 PromQL 查询可能不足以构建有效的特征。可能需要执行多次查询,并将结果在时间维度或标签维度上进行拼接,形成更丰富的特征向量。这会增加 PredictionService 中特征工程的复杂性,同时也对我们的单元测试提出了更高的要求,需要模拟更复杂的 PrometheusClient 返回值。这些都是我们下一步迭代的方向。


  目录