我們的系統遇到一個典型的瓶頸。寫入負載(Command 側)通過事件通知更新讀取模型(Query 側),一個標準的 CQRS 實現。最初,我們用 ZeroMQ 的 PUB/SUB 模式來廣播領域事件,看中的是它的簡潔和高性能。在開發和低負載測試中,一切正常。但在一次壓力測試中,Query 側的幾個服務由於數據庫寫入延遲,事件消費速度跟不上生產速度,最終因為內存隊列溢出而崩潰。
這就是 ZeroMQ 中著名的“慢速訂閱者”問題。PUB 套接字不會關心 SUB 是否能跟上,它只會不停地推送數據。如果 SUB 的處理邏輯阻塞或變慢,消息會在 SUB 的內存隊列中堆積,最終導致進程崩潰。在生產環境中,這種不穩定的行為是不可接受的。
最初的、有問題的代碼結構大致如下:
有問題的 Publisher (Command Service)
// command-service/publisher.js
const zmq = require('zeromq');
class EventPublisher {
constructor(port) {
this.socket = new zmq.Publisher();
this.port = port;
}
async start() {
await this.socket.bind(`tcp://*:${this.port}`);
console.log(`Event publisher bound to port ${this.port}`);
}
publish(topic, eventData) {
// 不關心消費者死活,直接發送
// 這就是問題的根源
const payload = JSON.stringify(eventData);
this.socket.send([topic, payload]);
}
}
// 模擬高頻事件發布
async function main() {
const publisher = new EventPublisher(5555);
await publisher.start();
let eventId = 0;
setInterval(() => {
eventId++;
publisher.publish('user.created', { id: eventId, name: `User ${eventId}`, ts: Date.now() });
if (eventId % 1000 === 0) {
console.log(`Published ${eventId} events...`);
}
}, 1); // 每毫秒一個事件
}
main();
有問題的 Subscriber (Query Service)
// query-service/subscriber.js
const zmq = require('zeromq');
class EventSubscriber {
constructor(publisherAddress) {
this.socket = new zmq.Subscriber();
this.address = publisherAddress;
}
async start(topic) {
this.socket.connect(this.address);
this.socket.subscribe(topic);
console.log(`Subscriber connected to ${this.address} on topic "${topic}"`);
for await (const [topic, msg] of this.socket) {
// 模擬耗時的數據庫操作
await this.processEvent(JSON.parse(msg.toString()));
}
}
async processEvent(event) {
// 這裡的延遲會導致 ZMQ 內部緩衝區爆炸
console.log(`Processing event ${event.id}`);
await new Promise(resolve => setTimeout(resolve, 100)); // 處理速度遠慢於生產速度
}
}
async function main() {
const subscriber = new EventSubscriber('tcp://localhost:5555');
await subscriber.start('user.created');
}
main();
運行上述代碼,Query Service 會在短時間內因內存耗盡而崩潰。這促使我們重新思考事件總線的實現。放棄 ZeroMQ 轉投 Kafka 或 RabbitMQ 是一個選項,它們內置了流控和持久化。但在我們的場景中,我們追求的是 brokerless 架構的低延遲和運維簡單性。因此,我們決定在 ZeroMQ 的基礎上,自己實現一套可靠的、帶有反壓(Backpressure)機制的通信模式。
從 PUB/SUB 到 ROUTER/DEALER 的演進
要實現反壓,通信必須是雙向的。消費者需要一種方式通知生產者:“我現在很忙,請慢一點”。PUB/SUB 是單向的,無法滿足這個需求。ZeroMQ 的 ROUTER
/DEALER
模式是解決方案的關鍵。
-
ROUTER
: 異步接收所有連接的消息,並能記住每個消息來源的身份(identity),從而可以精確地將響應發回給指定的客戶端。 -
DEALER
: 異步地向ROUTER
發送消息,並能接收發給自己的響應。
我們設計了一種基於“信用”(Credit-Based)的流控協議。
- 初始化: Query Service (Consumer) 連接到 Command Service (Producer) 的
ROUTER
套接字。連接成功後,它會立即發送一個初始“信用點”消息,例如[ 'CREDIT', 100 ]
,表示它準備好處理 100 條消息。 - 消息發送: Command Service 的
ROUTER
端維護每個已連接 Consumer 的信用餘額。當有新事件需要發送時,它會查找目標 Consumer,檢查其信用點是否大於 0。如果是,則發送事件並將其信用點減 1。如果信用點為 0,則將事件緩存在內存隊列中,等待 Consumer 補充信用。 - 信用補充: Query Service 每處理完一批消息(例如 50 條),就會向 Command Service 發送一個
CREDIT
消息,補充相應數量的信用點。 - 心跳與斷線處理:
ROUTER
可以通過心跳機制檢測DEALER
是否掉線。如果一個 Consumer 長時間沒有響應或補充信用,Producer 會認為它已失聯,並清理其狀態和隊列。
sequenceDiagram participant QueryService as Query Service (DEALER) participant CommandService as Command Service (ROUTER) QueryService->>CommandService: Connect & Send Initial Credits (e.g., 100) CommandService-->>QueryService: Acknowledge (Implicitly by accepting connection) loop Event Production & Consumption CommandService->>CommandService: Has event to send? alt Consumer has credits > 0 CommandService->>QueryService: Send Event Message CommandService->>CommandService: Decrement credits for this consumer else Consumer has credits == 0 CommandService->>CommandService: Buffer event in internal queue end QueryService->>QueryService: Process a batch of events (e.g., 50) QueryService->>CommandService: Send More Credits (e.g., 50) CommandService->>CommandService: Increment credits for this consumer CommandService->>CommandService: Drain buffered events if possible end
生產級代碼實現
下面是改造後的、更健壯的實現。我們將其拆分為幾個核心模塊:配置、日誌、Producer 和 Consumer。
配置文件 (config.js
)
在真實項目中,配置應該是外部化的。
// config.js
module.exports = {
producer: {
bindAddress: 'tcp://*:6677',
// 當某個 consumer 的隊列超過這個值,就發出警告
queueHighWaterMark: 10000,
// 心跳檢測間隔(毫秒)
heartbeatInterval: 5000,
// 超過多少個心跳間隔未收到響應視為客戶端超時
clientTimeoutIntervals: 3,
},
consumer: {
producerAddress: 'tcp://localhost:6677',
// Consumer 的唯一標識
identity: `query-service-${process.pid}`,
// 初始信用和每次補充的信用數量
creditBatchSize: 100,
// 模擬處理延遲
processingDelayMs: 20,
},
logging: {
level: 'info',
}
};
日誌模塊 (logger.js
)
使用 pino 進行結構化日誌記錄。
// logger.js
const pino = require('pino');
const config = require('./config');
const logger = pino({
level: config.logging.level,
transport: {
target: 'pino-pretty'
},
});
module.exports = logger;
重構後的 Producer (Command Service)
這個實現包含了客戶端管理、信用管理、事件緩衝和心跳檢測。
// command-service/resilient-producer.js
const zmq = require('zeromq');
const config = require('../config').producer;
const logger = require('../logger');
class ResilientProducer {
constructor() {
this.socket = new zmq.Router();
this.clients = new Map(); // K: identity, V: { credits: number, queue: Array, lastHeartbeat: number }
}
async start() {
await this.socket.bind(config.bindAddress);
logger.info(`Resilient producer bound to ${config.bindAddress}`);
this.messageLoop();
setInterval(() => this.checkHeartbeats(), config.heartbeatInterval);
}
// 核心消息處理循環
async messageLoop() {
logger.info('Starting producer message loop...');
for await (const [identity, delimiter, command, ...args] of this.socket) {
const clientId = identity.toString();
const cmd = command.toString();
if (!this.clients.has(clientId)) {
this.registerClient(clientId);
}
this.clients.get(clientId).lastHeartbeat = Date.now();
switch (cmd) {
case 'READY':
const initialCredits = parseInt(args[0].toString(), 10);
this.addCredits(clientId, initialCredits);
logger.info({ clientId, credits: initialCredits }, 'Client is ready and provided initial credits.');
break;
case 'CREDIT':
const moreCredits = parseInt(args[0].toString(), 10);
this.addCredits(clientId, moreCredits);
break;
case 'HEARTBEAT':
// 心跳響應已在前面更新 lastHeartbeat,這裡只需響應
this.socket.send([identity, '', 'HEARTBEAT_ACK']);
break;
default:
logger.warn({ clientId, cmd }, 'Received unknown command from client.');
}
}
}
registerClient(clientId) {
logger.info({ clientId }, 'New client connected. Registering...');
this.clients.set(clientId, {
credits: 0,
queue: [],
lastHeartbeat: Date.now(),
});
}
addCredits(clientId, amount) {
if (!this.clients.has(clientId)) return;
const client = this.clients.get(clientId);
client.credits += amount;
logger.debug({ clientId, added: amount, total: client.credits }, 'Credits added.');
// 有了新的信用點,嘗試清空該客戶端的緩存隊列
this.drainQueue(clientId);
}
// 嘗試發送緩存的消息
drainQueue(clientId) {
if (!this.clients.has(clientId)) return;
const client = this.clients.get(clientId);
while (client.queue.length > 0 && client.credits > 0) {
const event = client.queue.shift();
this.sendToClient(clientId, event);
client.credits--;
}
}
sendToClient(clientId, event) {
// 消息格式: [identity, delimiter, 'EVENT', topic, payload]
this.socket.send([
clientId,
'',
'EVENT',
event.topic,
JSON.stringify(event.payload),
]);
}
// 公開的發布接口
publish(topic, payload) {
const event = { topic, payload };
// 廣播給所有客戶端
for (const clientId of this.clients.keys()) {
const client = this.clients.get(clientId);
if (client.credits > 0) {
this.sendToClient(clientId, event);
client.credits--;
} else {
client.queue.push(event);
if (client.queue.length > config.queueHighWaterMark) {
logger.warn({ clientId, queueSize: client.queue.length }, 'Client queue high water mark exceeded.');
}
}
}
}
checkHeartbeats() {
const now = Date.now();
const timeout = config.heartbeatInterval * config.clientTimeoutIntervals;
for (const [clientId, client] of this.clients.entries()) {
if (now - client.lastHeartbeat > timeout) {
logger.warn({ clientId }, 'Client timed out. Removing.');
this.clients.delete(clientId);
}
}
}
}
// 模擬使用 Producer
async function main() {
const producer = new ResilientProducer();
await producer.start();
let eventId = 0;
setInterval(() => {
eventId++;
producer.publish('user.updated', { id: eventId, email: `user${eventId}@example.com`, ts: Date.now() });
if (eventId % 1000 === 0) {
logger.info(`Produced ${eventId} events...`);
}
}, 2); // 稍微降低生產速度以模擬真實場景
}
main();
重構後的 Consumer (Query Service)
這個實現包含了發送信用點、處理事件和心跳邏輯。
// query-service/resilient-consumer.js
const zmq = require('zeromq');
const config = require('../config').consumer;
const producerConfig = require('../config').producer;
const logger = require('../logger');
class ResilientConsumer {
constructor() {
this.socket = new zmq.Dealer();
this.socket.identity = config.identity;
this.processedCount = 0;
}
async start() {
this.socket.connect(config.producerAddress);
logger.info({ identity: this.socket.identity }, `Consumer connecting to ${config.producerAddress}`);
// 連接後立即發送 READY 信號和初始信用
this.socket.send(['', 'READY', config.creditBatchSize.toString()]);
this.messageLoop();
setInterval(() => this.sendHeartbeat(), producerConfig.heartbeatInterval);
}
async messageLoop() {
logger.info('Starting consumer message loop...');
for await (const [delimiter, command, ...args] of this.socket) {
const cmd = command.toString();
switch (cmd) {
case 'EVENT':
const topic = args[0].toString();
const payload = JSON.parse(args[1].toString());
await this.handleEvent(topic, payload);
break;
case 'HEARTBEAT_ACK':
logger.debug('Heartbeat ACK received.');
break;
default:
logger.warn({ cmd }, 'Received unknown command from producer.');
}
}
}
async handleEvent(topic, payload) {
logger.debug({ topic, payload }, 'Event received.');
// 模擬異步的、耗時的數據庫操作
await new Promise(resolve => setTimeout(resolve, config.processingDelayMs));
this.processedCount++;
// 每處理完一個批次的事件,就補充信用
if (this.processedCount % config.creditBatchSize === 0) {
this.replenishCredits();
}
}
replenishCredits() {
logger.info({ count: config.creditBatchSize }, 'Processed a batch, replenishing credits.');
this.socket.send(['', 'CREDIT', config.creditBatchSize.toString()]);
}
sendHeartbeat() {
logger.debug('Sending heartbeat...');
this.socket.send(['', 'HEARTBEAT']);
}
}
async function main() {
const consumer = new ResilientConsumer();
await consumer.start();
}
main();
關於單元測試的思路
在真實項目中,對這種通信協議進行測試至關重要。
- Producer 測試:
- 模擬一個 Client
DEALER
連接上來,發送READY
和CREDIT
消息。 - 斷言
producer.clients
Map 中正確創建了客戶端狀態。 - 調用
producer.publish
,如果客戶端有信用,斷言socket.send
被正確調用。 - 將客戶端信用設為 0,調用
producer.publish
,斷言消息被放入了隊列,socket.send
未被調用。 - 再模擬客戶端發送
CREDIT
消息,斷言隊列被清空,socket.send
被調用。 - 測試心跳超時:可以通過 time-mocking 庫來模擬時間流逝,斷言超時的客戶端被從
clients
Map 中移除。
- 模擬一個 Client
- Consumer 測試:
- 模擬一個
ROUTER
Socket。 - 啟動 Consumer 後,斷言它立即發送了
READY
消息。 - 模擬
ROUTER
發送EVENT
消息給 Consumer。 - 斷言
handleEvent
被調用。 - 通過控制
handleEvent
的調用次數,斷言當達到creditBatchSize
時,Consumer 會發送CREDIT
消息。
- 模擬一個
當前方案的局限性與未來展望
這個基於 ZeroMQ ROUTER
/DEALER
和信用流控的方案,有效地解決了慢速消費者的問題,使我們的 CQRS 事件總線變得穩定可靠。它保留了 brokerless 的優點,同時增加了必要的韌性。
然而,這個方案並非銀彈,它存在一些固有的局限性:
- Producer 的單點故障 (SPOF):
ResilientProducer
本身是一個單點。如果它崩潰,所有在途和緩存中的事件都會丟失。在生產環境中,需要對 Producer 做高可用部署,比如主備模式,這會增加架構的複雜性。 - 內存緩存的風險: 雖然我們有反壓機制,但如果所有 Consumer 都長時間處於慢速狀態,事件仍然會在 Producer 的內存隊列中無限堆積,最終導致 Producer 崩潰。一個改進方向是為 Producer 的總隊列設置一個上限,超過上限時可以選擇阻塞上游業務、丟棄非關鍵事件或將事件轉存到持久化存儲中。這就是所謂的“斷路器”模式。
- 事件持久性: 當前實現是純內存的。任何組件的重啟都會導致數據丟失。對於需要強一致性和可回溯性的場景,這個方案是不夠的。真正的事件溯源(Event Sourcing)系統需要一個持久化的、僅追加的日誌作為事件存儲(Event Store)。我們的 Producer 可以被看作是這個 Event Store 的一個分發層。一個可行的演進路徑是,在 Producer 發送事件前,先將其寫入到一個持久化日誌(如 PostgreSQL 的一個表,或專用的 Event Store 產品),然後再進行內存分發。這就實現了“事務性發件箱”(Transactional Outbox)模式。
- 新消費者加入: 如果一個新的 Query Service 實例上線,它如何獲取歷史狀態?當前設計無法滿足。它只能從上線的那一刻起接收新事件。完整的 CQRS 解決方案需要新服務能夠從 Event Store 中重放(Replay)所有歷史事件來構建自己的讀模型。
總之,我們構建的不是一個通用的消息隊列,而是一個特定場景下(CQRS 讀寫模型同步)的高性能、低延遲、有反壓能力的事件總線。它很好地平衡了性能和可靠性,但在持久化和系統恢復能力上做出了妥協。這在很多業務場景中是完全可以接受的權衡。下一步的迭代將聚焦於為 Producer 增加可選的持久化後端,並引入服務發現機制,使其更接近一個完備的平台級組件。