团队维护的 Prometheus 告警规则最近正以惊人的速度膨胀。最初基于静态阈值的规则简单有效,但随着系统复杂性增加,我们开始遇到瓶颈。cpu_usage > 90%
这样的规则无法捕捉到那些多指标之间微妙的、非线性的关联异常。例如,某个服务的 CPU 使用率可能只有 60%,但其响应延迟和队列深度却在以非正常模式同步增长。这种问题,静态阈值束手无策。
引入机器学习模型进行异常检测似乎是顺理成章的下一步。然而,一个常见的错误是将 ML 项目仅仅视为数据科学问题,忽略了其作为生产环境服务的工程健壮性。一个无法被有效测试、监控和维护的 ML 服务,其带来的风险远大于它解决的问题。这次复盘,记录了我们如何从零开始,构建一个生产级的、以单元测试为核心的、消费 Prometheus 指标的 Scikit-learn 异常检测服务。
第一版原型:能跑,但仅此而已
最初的构想非常直接:一个 Python 脚本,定期通过 Prometheus HTTP API 拉取数据,用预训练好的 IsolationForest
模型打分,然后将异常点打印到日志。
# WARNING: 这是个反面教材,请勿在生产中使用
import requests
import time
import pickle
import numpy as np
PROMETHEUS_URL = 'http://localhost:9090/api/v1/query'
QUERY = 'sum(rate(node_cpu_seconds_total{mode="idle"}[5m])) by (instance)'
MODEL_PATH = 'isolation_forest_model.pkl'
# 加载一个预训练好的模型
with open(MODEL_PATH, 'rb') as f:
model = pickle.load(f)
def fetch_and_predict():
try:
response = requests.get(PROMETHEUS_URL, params={'query': QUERY})
response.raise_for_status()
results = response.json()['data']['result']
for result in results:
# 简化处理,只取值
value = float(result['value'][1])
prediction = model.predict(np.array([[value]]))
if prediction[0] == -1: # -1 代表异常
instance = result['metric']['instance']
print(f"异常检测! Instance: {instance}, Value: {value}")
except requests.RequestException as e:
print(f"查询 Prometheus 失败: {e}")
except Exception as e:
print(f"处理数据失败: {e}")
if __name__ == '__main__':
while True:
fetch_and_predict()
time.sleep(60)
这个脚本能工作。但在真实项目中,它几乎是不可维护的。
- 强耦合: 网络请求、数据解析、模型预测和结果输出全部耦合在一个函数里。
- 不可测试: 如何测试
fetch_and_predict
?我们必须启动一个真实的 Prometheus 实例,并确保其中有特定数据。网络抖动、Prometheus 宕机都会导致测试失败,这违背了单元测试稳定、快速、隔离的原则。 - 状态缺失: 它只是打印日志,我们无法从外部观测其运行状态。它上次成功查询是什么时候?处理了多少条时间序列?预测时有没有出错?这些都无从得知。
- 配置硬编码: 所有配置都写死在代码里。
这个原型让我们明确了痛点:我们需要的是一个服务,而不是一个脚本。一个遵循“关注点分离”原则、具备良好可测试性和可观测性的服务。
架构重新设计:面向测试和服务化
我们的目标是构建一个可以独立部署和监控的微服务。其核心设计思想是分层和解耦。
graph TD subgraph Anomaly Detection Service A[Service Runner] --> B{Prediction Loop}; B --> C[Prometheus Client]; B --> D[Model Wrapper]; B --> E[Metrics Exporter]; C -- Fetches Metrics --> F[(Prometheus API)]; D -- Loads Model --> G[(Model File)]; E -- Exposes /metrics --> H[(Prometheus Scraper)]; end subgraph Testing Environment I[Pytest Runner] --> J[TestPredictionService]; J -- Mocks --> K[Mock Prometheus Client]; J -- Mocks --> L[Mock Model Wrapper]; J -- Injects Mocks --> B; J -- Asserts on --> E; end style Testing Environment fill:#f9f,stroke:#333,stroke-width:2px
我们将服务拆分为几个关键组件:
-
PrometheusClient
: 专职负责与 Prometheus API 交互。它封装了网络请求、认证(如果需要)、错误处理和响应解析。它的唯一职责就是根据查询返回结构化的数据。 -
ModelWrapper
: 封装 Scikit-learn 模型。它负责加载模型文件、数据预处理(例如,特征缩放)、执行预测,并返回一个标准化的结果。这层抽象让我们未来可以轻松替换成XGBoost
或PyTorch
模型,而无需改动上层业务逻辑。 -
MetricsExporter
: 使用prometheus_client
库,将服务自身的状态和模型的预测结果以 Prometheus 指标的格式暴露出去。这是实现“对监控系统进行监控”的关键。 -
PredictionService
: 业务逻辑的核心,负责编排上述组件。它驱动整个预测流程:调用PrometheusClient
获取数据,传递给ModelWrapper
进行预测,最后通过MetricsExporter
更新暴露的指标。
这种设计的最大优势在于可测试性。在单元测试中,我们可以轻易地用 Mock
对象替换 PrometheusClient
和 ModelWrapper
,从而在完全离线的环境中,精准地测试 PredictionService
的业务逻辑。
核心组件实现与生产级考量
1. PrometheusClient:健壮的数据源
这个客户端不仅仅是一个 requests.get
的封装。在真实项目中,我们需要考虑超时、重试、以及对 Prometheus 返回各种数据格式的健壮解析。
# service/prometheus_client.py
import requests
import logging
from typing import List, Dict, Any
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class PrometheusClient:
"""
一个健壮的 Prometheus API 客户端,负责查询和解析数据。
"""
def __init__(self, api_url: str, timeout: int = 10):
if not api_url.endswith('/api/v1'):
# 避免常见的配置错误
self.api_url = f"{api_url.rstrip('/')}/api/v1"
else:
self.api_url = api_url
self.timeout = timeout
self.query_url = f"{self.api_url}/query"
def query(self, promql: str) -> List[Dict[str, Any]]:
"""
执行 PromQL 查询并返回结果列表。
:param promql: 要执行的 PromQL 表达式。
:return: 一个包含时间序列数据的字典列表。
例如: [{'metric': {'label1': 'value1'}, 'value': [timestamp, 'value_str']}]
:raises ConnectionError: 如果网络请求失败。
:raises ValueError: 如果 Prometheus 返回非成功状态或数据格式不正确。
"""
logging.info(f"正在执行查询: {promql}")
try:
response = requests.get(
self.query_url,
params={'query': promql},
timeout=self.timeout
)
response.raise_for_status() # 如果状态码不是 2xx,则抛出 HTTPError
data = response.json()
if data.get('status') != 'success':
error_type = data.get('errorType', 'Unknown')
error_msg = data.get('error', 'No error message provided')
logging.error(f"Prometheus 查询失败: {error_type} - {error_msg}")
raise ValueError(f"Prometheus query failed: {error_type} - {error_msg}")
return data.get('data', {}).get('result', [])
except requests.exceptions.RequestException as e:
logging.error(f"连接 Prometheus 失败: {e}")
raise ConnectionError(f"Failed to connect to Prometheus at {self.api_url}") from e
except (KeyError, IndexError, ValueError) as e:
logging.error(f"解析 Prometheus 响应失败: {e}")
raise ValueError("Invalid response format from Prometheus") from e
这里的坑在于:必须处理 status
不为 success
的情况,并记录下 errorType
和 error
,这对于调试 PromQL 语法错误至关重要。
2. ModelWrapper:隔离机器学习的复杂性
ModelWrapper
的职责是让业务代码感觉不到 Scikit-learn 的存在。它提供一个干净的 predict
接口。
# service/model_wrapper.py
import pickle
import numpy as np
import logging
from typing import List
class ModelWrapper:
"""
封装 Scikit-learn 模型,提供加载和预测的接口。
"""
def __init__(self, model_path: str):
self.model_path = model_path
self._model = None
self.load_model()
def load_model(self):
"""
从文件加载模型。如果失败,服务将无法启动。
"""
try:
logging.info(f"正在从 {self.model_path} 加载模型...")
with open(self.model_path, 'rb') as f:
self._model = pickle.load(f)
logging.info("模型加载成功。")
except FileNotFoundError:
logging.error(f"模型文件未找到: {self.model_path}")
raise
except (pickle.UnpicklingError, EOFError) as e:
logging.error(f"加载模型文件失败,文件可能已损坏: {e}")
raise
def predict(self, features: List[List[float]]) -> np.ndarray:
"""
使用加载的模型进行预测。
:param features: 一个二维列表,每行代表一个样本的特征。
:return: 一个 numpy 数组,包含每个样本的预测结果 (-1 for anomaly, 1 for normal)。
"""
if not features:
return np.array([])
if self._model is None:
logging.error("模型未加载,无法进行预测。")
raise RuntimeError("Model is not loaded.")
try:
# 转换为 numpy 数组,这是 scikit-learn 的标准输入格式
feature_array = np.array(features)
# 在真实项目中,这里可能还需要进行特征缩放等预处理
return self._model.predict(feature_array)
except Exception as e:
logging.error(f"模型预测时发生错误: {e}")
# 避免因单个预测失败而导致整个服务崩溃
# 返回一个空数组,上层逻辑需要处理这种情况
return np.array([])
一个关键点是在 __init__
中直接调用 load_model
。这意味着如果模型文件有问题,服务在启动时就会失败。这是一种“快速失败”的设计哲学,避免服务在运行时才发现这个致命错误。
3. MetricsExporter:暴露内部状态
这是服务的“仪表盘”。我们不仅要暴露模型的预测结果,更要暴露服务自身的健康状况。
# service/metrics_exporter.py
from prometheus_client import Gauge, Counter, Histogram
import logging
class MetricsExporter:
"""
管理所有需要暴露给 Prometheus 的指标。
"""
def __init__(self, service_name: str = "anomaly_detection_service"):
# 业务指标: 每个时间序列的异常得分
self.anomaly_score = Gauge(
f'{service_name}_anomaly_score',
'Anomaly score for a given time series. -1 for anomaly, 1 for normal.',
['instance', 'job'] # 这里的标签应该与被监控的指标标签保持一致
)
# 服务健康指标
self.predictions_total = Counter(
f'{service_name}_predictions_total',
'Total number of predictions made.'
)
self.errors_total = Counter(
f'{service_name}_errors_total',
'Total number of errors encountered.',
['error_type'] # 按错误类型分类,例如 'prometheus_query', 'model_predict'
)
self.last_successful_run_timestamp = Gauge(
f'{service_name}_last_successful_run_timestamp',
'Timestamp of the last successful prediction run.'
)
self.prediction_latency = Histogram(
f'{service_name}_prediction_latency_seconds',
'Latency of the prediction loop in seconds.'
)
logging.info("MetricsExporter 初始化完成。")
def report_anomaly_score(self, labels: dict, score: int):
"""
报告一个时间序列的异常分数。
:param labels: 包含该时间序列所有标签的字典。
:param score: 模型的预测结果 (-1 或 1)。
"""
# .labels() 方法会返回一个特定标签组合的子指标
self.anomaly_score.labels(**labels).set(score)
def increment_error(self, error_type: str):
self.errors_total.labels(error_type=error_type).inc()
def increment_prediction(self):
self.predictions_total.inc()
anomaly_score
使用 Gauge
类型,因为它会上下波动。predictions_total
和 errors_total
使用 Counter
,因为它们是只增不减的计数器。使用 Histogram
来度量 prediction_latency
,可以让我们后续计算分位数(如 p99 延迟),比简单的 Gauge
或 Summary
提供更多信息。
单元测试:质量保障的核心
现在,所有组件都已解耦,我们可以为它们编写隔离的、可靠的单元测试了。我们将使用 pytest
和 unittest.mock
。
测试 PrometheusClient
我们需要模拟 requests.get
的行为,包括成功、HTTP 错误和返回格式错误的情况。
# tests/test_prometheus_client.py
import pytest
from unittest.mock import patch, MagicMock
from service.prometheus_client import PrometheusClient
@pytest.fixture
def client():
return PrometheusClient(api_url="http://fake-prometheus:9090")
# 模拟一个成功的 Prometheus API 响应
MOCK_SUCCESS_RESPONSE = {
"status": "success",
"data": {
"resultType": "vector",
"result": [
{
"metric": {"instance": "node1", "job": "node_exporter"},
"value": [1672531200, "0.85"]
}
]
}
}
# 模拟一个查询失败的响应
MOCK_FAILURE_RESPONSE = {
"status": "error",
"errorType": "bad_data",
"error": "invalid expression"
}
@patch('requests.get')
def test_query_success(mock_get, client):
"""测试成功的查询"""
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = MOCK_SUCCESS_RESPONSE
mock_get.return_value = mock_response
result = client.query("some_query")
mock_get.assert_called_once_with(
client.query_url,
params={'query': 'some_query'},
timeout=10
)
assert result == MOCK_SUCCESS_RESPONSE['data']['result']
@patch('requests.get')
def test_query_http_error(mock_get, client):
"""测试网络或HTTP错误"""
mock_response = MagicMock()
mock_response.raise_for_status.side_effect = requests.exceptions.HTTPError
mock_get.return_value = mock_response
with pytest.raises(ConnectionError):
client.query("some_query")
@patch('requests.get')
def test_query_prometheus_error(mock_get, client):
"""测试 Prometheus 返回 status=error 的情况"""
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = MOCK_FAILURE_RESPONSE
mock_get.return_value = mock_response
with pytest.raises(ValueError, match="Prometheus query failed: bad_data"):
client.query("some_query")
@patch('requests.get')
def test_query_invalid_json(mock_get, client):
"""测试返回的 JSON 格式不正确"""
mock_response = MagicMock()
mock_response.status_code = 200
# 返回一个缺少 'data' 键的响应
mock_response.json.return_value = {"status": "success"}
mock_get.return_value = mock_response
with pytest.raises(ValueError, match="Invalid response format from Prometheus"):
client.query("some_query")
测试 PredictionService 的集成逻辑
这是最重要的测试。我们不关心底层模型或网络请求的真实性,只关心 PredictionService
是否能正确地调用它的依赖项,并根据它们的返回值更新指标。
# service/prediction_service.py
import time
import logging
from .prometheus_client import PrometheusClient
from .model_wrapper import ModelWrapper
from .metrics_exporter import MetricsExporter
class PredictionService:
def __init__(
self,
client: PrometheusClient,
model: ModelWrapper,
exporter: MetricsExporter,
query: str
):
self.client = client
self.model = model
self.exporter = exporter
self.query = query
def run_once(self):
logging.info("开始新一轮预测...")
with self.exporter.prediction_latency.time():
try:
time_series = self.client.query(self.query)
if not time_series:
logging.warning("查询返回空结果。")
return
# 准备模型输入
labels_list = []
features_list = []
for ts in time_series:
# 假设我们只使用值作为特征
# 真实场景可能更复杂,需要多个查询和特征工程
labels_list.append(ts.get('metric', {}))
features_list.append([float(ts['value'][1])])
# 获取预测结果
predictions = self.model.predict(features_list)
# 上报指标
for labels, prediction in zip(labels_list, predictions):
self.exporter.report_anomaly_score(labels, int(prediction))
self.exporter.increment_prediction()
self.exporter.last_successful_run_timestamp.set_to_current_time()
logging.info(f"预测完成,处理了 {len(time_series)} 条时间序列。")
except ConnectionError:
self.exporter.increment_error(error_type='prometheus_query')
except ValueError as e:
logging.error(f"数据处理或模型输入错误: {e}")
self.exporter.increment_error(error_type='data_processing')
except RuntimeError as e:
logging.error(f"模型预测运行时错误: {e}")
self.exporter.increment_error(error_type='model_predict')
对应的测试代码:
# tests/test_prediction_service.py
from unittest.mock import MagicMock
import numpy as np
from service.prediction_service import PredictionService
def test_run_once_success_flow():
"""测试 PredictionService 成功执行的完整流程"""
# 1. 准备 Mock 对象
mock_client = MagicMock(spec=PrometheusClient)
mock_model = MagicMock(spec=ModelWrapper)
mock_exporter = MagicMock() # 使用普通 MagicMock 更容易断言
# 2. 设定 Mock 对象的行为
mock_prometheus_response = [
{'metric': {'instance': 'node1'}, 'value': [0, '10.0']},
{'metric': {'instance': 'node2'}, 'value': [0, '100.0']},
]
mock_client.query.return_value = mock_prometheus_response
mock_model_prediction = np.array([1, -1]) # node1 正常, node2 异常
mock_model.predict.return_value = mock_model_prediction
# 3. 初始化被测服务
service = PredictionService(mock_client, mock_model, mock_exporter, query="fake_query")
# 4. 执行被测方法
service.run_once()
# 5. 断言
# 确认依赖项被正确调用
mock_client.query.assert_called_once_with("fake_query")
# 确认模型接收了正确格式的特征
# np.testing.assert_array_equal 用于比较 numpy 数组
np.testing.assert_array_equal(
mock_model.predict.call_args[0][0],
np.array([[10.0], [100.0]])
)
# 确认指标被正确上报
assert mock_exporter.report_anomaly_score.call_count == 2
mock_exporter.report_anomaly_score.assert_any_call({'instance': 'node1'}, 1)
mock_exporter.report_anomaly_score.assert_any_call({'instance': 'node2'}, -1)
assert mock_exporter.increment_prediction.call_count == 2
mock_exporter.last_successful_run_timestamp.set_to_current_time.assert_called_once()
mock_exporter.increment_error.assert_not_called()
def test_run_once_handles_query_error():
"""测试当 Prometheus 查询失败时,服务能正确处理并上报错误"""
mock_client = MagicMock(spec=PrometheusClient)
mock_model = MagicMock(spec=ModelWrapper)
mock_exporter = MagicMock()
mock_client.query.side_effect = ConnectionError("Connection failed")
service = PredictionService(mock_client, mock_model, mock_exporter, query="fake_query")
service.run_once()
mock_model.predict.assert_not_called()
mock_exporter.increment_error.assert_called_once_with(error_type='prometheus_query')
mock_exporter.report_anomaly_score.assert_not_called()
这些测试覆盖了核心的业务逻辑,确保了数据在各个组件之间的正确流动和转换,以及在异常情况下的正确响应。它们完全不依赖于网络或文件系统,执行速度极快,可以轻松集成到 CI 流水线中。
服务的最终形态与局限性
最终,我们将所有组件组装在一个简单的 Web 服务器中,它只做一件事:暴露 /metrics
端点供 Prometheus 抓取。
# main.py
import os
import time
from prometheus_client import start_http_server
from service.prometheus_client import PrometheusClient
from service.model_wrapper import ModelWrapper
from service.metrics_exporter import MetricsExporter
from service.prediction_service import PredictionService
if __name__ == '__main__':
# 从环境变量获取配置,这是云原生应用的最佳实践
PROMETHEUS_URL = os.getenv("PROMETHEUS_URL", "http://localhost:9090")
MODEL_PATH = os.getenv("MODEL_PATH", "model.pkl")
QUERY = os.getenv("PROMQL_QUERY", 'vector(1)')
LISTEN_PORT = int(os.getenv("LISTEN_PORT", 8000))
SCRAPE_INTERVAL = int(os.getenv("SCRAPE_INTERVAL", 60))
# 1. 初始化所有组件
client = PrometheusClient(api_url=PROMETHEUS_URL)
model = ModelWrapper(model_path=MODEL_PATH)
exporter = MetricsExporter()
service = PredictionService(
client=client,
model=model,
exporter=exporter,
query=QUERY
)
# 2. 启动 HTTP 服务器以暴露指标
start_http_server(LISTEN_PORT)
# 3. 运行主循环
while True:
service.run_once()
time.sleep(SCRAPE_INTERVAL)
这个方案解决了我们最初的所有痛点。它是一个健壮、可测试、可观测的服务。然而,它并非没有局限性。当前的设计使用了一个静态的、预训练好的模型。在真实世界中,数据分布会随着时间漂移,模型性能会逐渐下降。一个完整的 MLOps 闭环需要包含一个自动化的模型再训练、验证和部署流水线。例如,我们可以定期从 Prometheus 导出历史数据作为训练集,训练新模型,在影子环境中与旧模型进行 A/B 测试,只有当新模型表现更优时,才将其部署到生产环境,替换掉旧的 model.pkl
文件。
此外,对于更复杂的异常模式,单一的 PromQL 查询可能不足以构建有效的特征。可能需要执行多次查询,并将结果在时间维度或标签维度上进行拼接,形成更丰富的特征向量。这会增加 PredictionService
中特征工程的复杂性,同时也对我们的单元测试提出了更高的要求,需要模拟更复杂的 PrometheusClient
返回值。这些都是我们下一步迭代的方向。