使用 ZeroMQ 與 Node.js 構建具備反壓機制的 CQRS 讀模型同步總線


我們的系統遇到一個典型的瓶頸。寫入負載(Command 側)通過事件通知更新讀取模型(Query 側),一個標準的 CQRS 實現。最初,我們用 ZeroMQ 的 PUB/SUB 模式來廣播領域事件,看中的是它的簡潔和高性能。在開發和低負載測試中,一切正常。但在一次壓力測試中,Query 側的幾個服務由於數據庫寫入延遲,事件消費速度跟不上生產速度,最終因為內存隊列溢出而崩潰。

這就是 ZeroMQ 中著名的“慢速訂閱者”問題。PUB 套接字不會關心 SUB 是否能跟上,它只會不停地推送數據。如果 SUB 的處理邏輯阻塞或變慢,消息會在 SUB 的內存隊列中堆積,最終導致進程崩潰。在生產環境中,這種不穩定的行為是不可接受的。

最初的、有問題的代碼結構大致如下:

有問題的 Publisher (Command Service)

// command-service/publisher.js
const zmq = require('zeromq');

class EventPublisher {
  constructor(port) {
    this.socket = new zmq.Publisher();
    this.port = port;
  }

  async start() {
    await this.socket.bind(`tcp://*:${this.port}`);
    console.log(`Event publisher bound to port ${this.port}`);
  }

  publish(topic, eventData) {
    // 不關心消費者死活,直接發送
    // 這就是問題的根源
    const payload = JSON.stringify(eventData);
    this.socket.send([topic, payload]);
  }
}

// 模擬高頻事件發布
async function main() {
  const publisher = new EventPublisher(5555);
  await publisher.start();
  
  let eventId = 0;
  setInterval(() => {
    eventId++;
    publisher.publish('user.created', { id: eventId, name: `User ${eventId}`, ts: Date.now() });
    if (eventId % 1000 === 0) {
      console.log(`Published ${eventId} events...`);
    }
  }, 1); // 每毫秒一個事件
}

main();

有問題的 Subscriber (Query Service)

// query-service/subscriber.js
const zmq = require('zeromq');

class EventSubscriber {
  constructor(publisherAddress) {
    this.socket = new zmq.Subscriber();
    this.address = publisherAddress;
  }

  async start(topic) {
    this.socket.connect(this.address);
    this.socket.subscribe(topic);
    console.log(`Subscriber connected to ${this.address} on topic "${topic}"`);

    for await (const [topic, msg] of this.socket) {
      // 模擬耗時的數據庫操作
      await this.processEvent(JSON.parse(msg.toString()));
    }
  }

  async processEvent(event) {
    // 這裡的延遲會導致 ZMQ 內部緩衝區爆炸
    console.log(`Processing event ${event.id}`);
    await new Promise(resolve => setTimeout(resolve, 100)); // 處理速度遠慢於生產速度
  }
}

async function main() {
  const subscriber = new EventSubscriber('tcp://localhost:5555');
  await subscriber.start('user.created');
}

main();

運行上述代碼,Query Service 會在短時間內因內存耗盡而崩潰。這促使我們重新思考事件總線的實現。放棄 ZeroMQ 轉投 Kafka 或 RabbitMQ 是一個選項,它們內置了流控和持久化。但在我們的場景中,我們追求的是 brokerless 架構的低延遲和運維簡單性。因此,我們決定在 ZeroMQ 的基礎上,自己實現一套可靠的、帶有反壓(Backpressure)機制的通信模式。

從 PUB/SUB 到 ROUTER/DEALER 的演進

要實現反壓,通信必須是雙向的。消費者需要一種方式通知生產者:“我現在很忙,請慢一點”。PUB/SUB 是單向的,無法滿足這個需求。ZeroMQ 的 ROUTER/DEALER 模式是解決方案的關鍵。

  • ROUTER: 異步接收所有連接的消息,並能記住每個消息來源的身份(identity),從而可以精確地將響應發回給指定的客戶端。
  • DEALER: 異步地向 ROUTER 發送消息,並能接收發給自己的響應。

我們設計了一種基於“信用”(Credit-Based)的流控協議。

  1. 初始化: Query Service (Consumer) 連接到 Command Service (Producer) 的 ROUTER 套接字。連接成功後,它會立即發送一個初始“信用點”消息,例如 [ 'CREDIT', 100 ],表示它準備好處理 100 條消息。
  2. 消息發送: Command Service 的 ROUTER 端維護每個已連接 Consumer 的信用餘額。當有新事件需要發送時,它會查找目標 Consumer,檢查其信用點是否大於 0。如果是,則發送事件並將其信用點減 1。如果信用點為 0,則將事件緩存在內存隊列中,等待 Consumer 補充信用。
  3. 信用補充: Query Service 每處理完一批消息(例如 50 條),就會向 Command Service 發送一個 CREDIT 消息,補充相應數量的信用點。
  4. 心跳與斷線處理: ROUTER 可以通過心跳機制檢測 DEALER 是否掉線。如果一個 Consumer 長時間沒有響應或補充信用,Producer 會認為它已失聯,並清理其狀態和隊列。
sequenceDiagram
    participant QueryService as Query Service (DEALER)
    participant CommandService as Command Service (ROUTER)

    QueryService->>CommandService: Connect & Send Initial Credits (e.g., 100)
    CommandService-->>QueryService: Acknowledge (Implicitly by accepting connection)
    
    loop Event Production & Consumption
        CommandService->>CommandService: Has event to send?
        alt Consumer has credits > 0
            CommandService->>QueryService: Send Event Message
            CommandService->>CommandService: Decrement credits for this consumer
        else Consumer has credits == 0
            CommandService->>CommandService: Buffer event in internal queue
        end

        QueryService->>QueryService: Process a batch of events (e.g., 50)
        QueryService->>CommandService: Send More Credits (e.g., 50)
        CommandService->>CommandService: Increment credits for this consumer
        CommandService->>CommandService: Drain buffered events if possible
    end

生產級代碼實現

下面是改造後的、更健壯的實現。我們將其拆分為幾個核心模塊:配置、日誌、Producer 和 Consumer。

配置文件 (config.js)

在真實項目中,配置應該是外部化的。

// config.js
module.exports = {
  producer: {
    bindAddress: 'tcp://*:6677',
    // 當某個 consumer 的隊列超過這個值,就發出警告
    queueHighWaterMark: 10000, 
    // 心跳檢測間隔(毫秒)
    heartbeatInterval: 5000,
    // 超過多少個心跳間隔未收到響應視為客戶端超時
    clientTimeoutIntervals: 3, 
  },
  consumer: {
    producerAddress: 'tcp://localhost:6677',
    // Consumer 的唯一標識
    identity: `query-service-${process.pid}`, 
    // 初始信用和每次補充的信用數量
    creditBatchSize: 100, 
    // 模擬處理延遲
    processingDelayMs: 20, 
  },
  logging: {
    level: 'info',
  }
};

日誌模塊 (logger.js)

使用 pino 進行結構化日誌記錄。

// logger.js
const pino = require('pino');
const config = require('./config');

const logger = pino({
  level: config.logging.level,
  transport: {
    target: 'pino-pretty'
  },
});

module.exports = logger;

重構後的 Producer (Command Service)

這個實現包含了客戶端管理、信用管理、事件緩衝和心跳檢測。

// command-service/resilient-producer.js
const zmq = require('zeromq');
const config = require('../config').producer;
const logger = require('../logger');

class ResilientProducer {
  constructor() {
    this.socket = new zmq.Router();
    this.clients = new Map(); // K: identity, V: { credits: number, queue: Array, lastHeartbeat: number }
  }

  async start() {
    await this.socket.bind(config.bindAddress);
    logger.info(`Resilient producer bound to ${config.bindAddress}`);
    
    this.messageLoop();
    setInterval(() => this.checkHeartbeats(), config.heartbeatInterval);
  }

  // 核心消息處理循環
  async messageLoop() {
    logger.info('Starting producer message loop...');
    for await (const [identity, delimiter, command, ...args] of this.socket) {
      const clientId = identity.toString();
      const cmd = command.toString();

      if (!this.clients.has(clientId)) {
        this.registerClient(clientId);
      }
      this.clients.get(clientId).lastHeartbeat = Date.now();

      switch (cmd) {
        case 'READY':
          const initialCredits = parseInt(args[0].toString(), 10);
          this.addCredits(clientId, initialCredits);
          logger.info({ clientId, credits: initialCredits }, 'Client is ready and provided initial credits.');
          break;
        case 'CREDIT':
          const moreCredits = parseInt(args[0].toString(), 10);
          this.addCredits(clientId, moreCredits);
          break;
        case 'HEARTBEAT':
          // 心跳響應已在前面更新 lastHeartbeat,這裡只需響應
          this.socket.send([identity, '', 'HEARTBEAT_ACK']);
          break;
        default:
          logger.warn({ clientId, cmd }, 'Received unknown command from client.');
      }
    }
  }

  registerClient(clientId) {
    logger.info({ clientId }, 'New client connected. Registering...');
    this.clients.set(clientId, {
      credits: 0,
      queue: [],
      lastHeartbeat: Date.now(),
    });
  }

  addCredits(clientId, amount) {
    if (!this.clients.has(clientId)) return;
    const client = this.clients.get(clientId);
    client.credits += amount;
    logger.debug({ clientId, added: amount, total: client.credits }, 'Credits added.');
    // 有了新的信用點,嘗試清空該客戶端的緩存隊列
    this.drainQueue(clientId);
  }

  // 嘗試發送緩存的消息
  drainQueue(clientId) {
    if (!this.clients.has(clientId)) return;
    const client = this.clients.get(clientId);

    while (client.queue.length > 0 && client.credits > 0) {
      const event = client.queue.shift();
      this.sendToClient(clientId, event);
      client.credits--;
    }
  }

  sendToClient(clientId, event) {
    // 消息格式: [identity, delimiter, 'EVENT', topic, payload]
    this.socket.send([
      clientId,
      '',
      'EVENT',
      event.topic,
      JSON.stringify(event.payload),
    ]);
  }
  
  // 公開的發布接口
  publish(topic, payload) {
    const event = { topic, payload };
    // 廣播給所有客戶端
    for (const clientId of this.clients.keys()) {
      const client = this.clients.get(clientId);
      if (client.credits > 0) {
        this.sendToClient(clientId, event);
        client.credits--;
      } else {
        client.queue.push(event);
        if (client.queue.length > config.queueHighWaterMark) {
          logger.warn({ clientId, queueSize: client.queue.length }, 'Client queue high water mark exceeded.');
        }
      }
    }
  }

  checkHeartbeats() {
    const now = Date.now();
    const timeout = config.heartbeatInterval * config.clientTimeoutIntervals;
    
    for (const [clientId, client] of this.clients.entries()) {
      if (now - client.lastHeartbeat > timeout) {
        logger.warn({ clientId }, 'Client timed out. Removing.');
        this.clients.delete(clientId);
      }
    }
  }
}

// 模擬使用 Producer
async function main() {
  const producer = new ResilientProducer();
  await producer.start();

  let eventId = 0;
  setInterval(() => {
    eventId++;
    producer.publish('user.updated', { id: eventId, email: `user${eventId}@example.com`, ts: Date.now() });
    if (eventId % 1000 === 0) {
        logger.info(`Produced ${eventId} events...`);
    }
  }, 2); // 稍微降低生產速度以模擬真實場景
}

main();

重構後的 Consumer (Query Service)

這個實現包含了發送信用點、處理事件和心跳邏輯。

// query-service/resilient-consumer.js
const zmq = require('zeromq');
const config = require('../config').consumer;
const producerConfig = require('../config').producer;
const logger = require('../logger');

class ResilientConsumer {
  constructor() {
    this.socket = new zmq.Dealer();
    this.socket.identity = config.identity;
    this.processedCount = 0;
  }

  async start() {
    this.socket.connect(config.producerAddress);
    logger.info({ identity: this.socket.identity }, `Consumer connecting to ${config.producerAddress}`);
    
    // 連接後立即發送 READY 信號和初始信用
    this.socket.send(['', 'READY', config.creditBatchSize.toString()]);

    this.messageLoop();
    setInterval(() => this.sendHeartbeat(), producerConfig.heartbeatInterval);
  }

  async messageLoop() {
    logger.info('Starting consumer message loop...');
    for await (const [delimiter, command, ...args] of this.socket) {
      const cmd = command.toString();
      switch (cmd) {
        case 'EVENT':
          const topic = args[0].toString();
          const payload = JSON.parse(args[1].toString());
          await this.handleEvent(topic, payload);
          break;
        case 'HEARTBEAT_ACK':
          logger.debug('Heartbeat ACK received.');
          break;
        default:
          logger.warn({ cmd }, 'Received unknown command from producer.');
      }
    }
  }
  
  async handleEvent(topic, payload) {
    logger.debug({ topic, payload }, 'Event received.');
    // 模擬異步的、耗時的數據庫操作
    await new Promise(resolve => setTimeout(resolve, config.processingDelayMs));
    this.processedCount++;

    // 每處理完一個批次的事件,就補充信用
    if (this.processedCount % config.creditBatchSize === 0) {
      this.replenishCredits();
    }
  }

  replenishCredits() {
    logger.info({ count: config.creditBatchSize }, 'Processed a batch, replenishing credits.');
    this.socket.send(['', 'CREDIT', config.creditBatchSize.toString()]);
  }
  
  sendHeartbeat() {
    logger.debug('Sending heartbeat...');
    this.socket.send(['', 'HEARTBEAT']);
  }
}

async function main() {
  const consumer = new ResilientConsumer();
  await consumer.start();
}

main();

關於單元測試的思路

在真實項目中,對這種通信協議進行測試至關重要。

  1. Producer 測試:
    • 模擬一個 Client DEALER 連接上來,發送 READYCREDIT 消息。
    • 斷言 producer.clients Map 中正確創建了客戶端狀態。
    • 調用 producer.publish,如果客戶端有信用,斷言 socket.send 被正確調用。
    • 將客戶端信用設為 0,調用 producer.publish,斷言消息被放入了隊列,socket.send 未被調用。
    • 再模擬客戶端發送 CREDIT 消息,斷言隊列被清空,socket.send 被調用。
    • 測試心跳超時:可以通過 time-mocking 庫來模擬時間流逝,斷言超時的客戶端被從 clients Map 中移除。
  2. Consumer 測試:
    • 模擬一個 ROUTER Socket。
    • 啟動 Consumer 後,斷言它立即發送了 READY 消息。
    • 模擬 ROUTER 發送 EVENT 消息給 Consumer。
    • 斷言 handleEvent 被調用。
    • 通過控制 handleEvent 的調用次數,斷言當達到 creditBatchSize 時,Consumer 會發送 CREDIT 消息。

當前方案的局限性與未來展望

這個基於 ZeroMQ ROUTER/DEALER 和信用流控的方案,有效地解決了慢速消費者的問題,使我們的 CQRS 事件總線變得穩定可靠。它保留了 brokerless 的優點,同時增加了必要的韌性。

然而,這個方案並非銀彈,它存在一些固有的局限性:

  1. Producer 的單點故障 (SPOF): ResilientProducer 本身是一個單點。如果它崩潰,所有在途和緩存中的事件都會丟失。在生產環境中,需要對 Producer 做高可用部署,比如主備模式,這會增加架構的複雜性。
  2. 內存緩存的風險: 雖然我們有反壓機制,但如果所有 Consumer 都長時間處於慢速狀態,事件仍然會在 Producer 的內存隊列中無限堆積,最終導致 Producer 崩潰。一個改進方向是為 Producer 的總隊列設置一個上限,超過上限時可以選擇阻塞上游業務、丟棄非關鍵事件或將事件轉存到持久化存儲中。這就是所謂的“斷路器”模式。
  3. 事件持久性: 當前實現是純內存的。任何組件的重啟都會導致數據丟失。對於需要強一致性和可回溯性的場景,這個方案是不夠的。真正的事件溯源(Event Sourcing)系統需要一個持久化的、僅追加的日誌作為事件存儲(Event Store)。我們的 Producer 可以被看作是這個 Event Store 的一個分發層。一個可行的演進路徑是,在 Producer 發送事件前,先將其寫入到一個持久化日誌(如 PostgreSQL 的一個表,或專用的 Event Store 產品),然後再進行內存分發。這就實現了“事務性發件箱”(Transactional Outbox)模式。
  4. 新消費者加入: 如果一個新的 Query Service 實例上線,它如何獲取歷史狀態?當前設計無法滿足。它只能從上線的那一刻起接收新事件。完整的 CQRS 解決方案需要新服務能夠從 Event Store 中重放(Replay)所有歷史事件來構建自己的讀模型。

總之,我們構建的不是一個通用的消息隊列,而是一個特定場景下(CQRS 讀寫模型同步)的高性能、低延遲、有反壓能力的事件總線。它很好地平衡了性能和可靠性,但在持久化和系統恢復能力上做出了妥協。這在很多業務場景中是完全可以接受的權衡。下一步的迭代將聚焦於為 Producer 增加可選的持久化後端,並引入服務發現機制,使其更接近一個完備的平台級組件。


  目录