標籤:

02.Egg源碼解讀:多進程模型和進程間通訊

之前介紹了egg啟動前的一些準備工作,接下來繼續了解下啟動的核心部分

多進程模式啟動

下面來了解下start-cluster的運行機制,start-cluster的路徑是node_modules/egg-bin/lib/start-cluster

#!/usr/bin/env nodeuse strict;const debug = require(debug)(egg-bin:start-cluster);const options = JSON.parse(process.argv[2]);debug(start cluster options: %j, options);require(options.framework).startCluster(options);

options.framework是/node_modules/egg,入口文件是node_modules/egg/index.js

use strict;/** * @namespace Egg *//** * Start egg application with cluster mode * @since 1.0.0 */exports.startCluster = require(egg-cluster).startCluster;

egg-cluster的入口是node_modules/egg-cluster/index.js

use strict;const Master = require(./lib/master);/** * cluster start flow: * * [startCluster] -> master -> agent_worker -> new [Agent] -> agentWorkerLoader * `-> app_worker -> new [Application] -> appWorkerLoader * *//** * start egg app * @method Egg#startCluster * @param {Object} options {@link Master} * @param {Function} callback start success callback */exports.startCluster = function(options, callback) { new Master(options).ready(callback);};

幾種進程的啟動進程如下:多進程模型和進程間通訊

  1. Master 啟動後先 fork Agent 進程
  2. Agent 初始化成功後,通過 IPC 通道通知 Master
  3. Master 再 fork 多個 App Worker
  4. App Worker 初始化成功,通知 Master
  5. 所有的進程初始化成功後,Master 通知 Agent 和 Worker 應用啟動成功

第一步:Master 啟動後先 fork Agent 進程,在啟動之前,先檢測可用的埠號

detectPort((err, port) => { /* istanbul ignore if */ if (err) { err.name = ClusterPortConflictError; err.message = [master] try get free port error, + err.message; this.logger.error(err); process.exit(1); return; } this.options.clusterPort = port; this.forkAgentWorker();});

forkAgentWorker方法中,使用childprocess.fork創建子進程,此處使用childprocess方式fork,沒有使用cluster的fork,關於兩者區別,以及異常的處理、進程退出等問題,推薦幾篇文章,可以進一步了解,《子進程的優雅退出》、《Graceful exit with cluster and pm》、《Node.js 非同步異常的處理與domain模塊解析》、《當我們談論 cluster 時我們在談論什麼(上)》、《當我們談論 cluster 時我們在談論什麼(下)》

forkAgentWorker() { const agentWorker = this.agentWorker = childprocess.fork(agentWorkerFile, args, opt); agentWorker.id = ++this.agentWorkerIndex; this.log([master] agent_worker#%s:%s start with clusterPort:%s, agentWorker.id, agentWorker.pid, this.options.clusterPort);}

第二步:Agent 初始化成功後,通過 IPC 通道通知 Master

agent_worker.js,agent啟動後通過 IPC 通道通知 Master

agent.ready(() => { agent.removeListener(error, startErrorHandler); process.send({ action: agent-start, to: master });});

第三步:Master 再 fork 多個 App Worker

master.js,監聽agent-start事件

// fork app workers after agent startedthis.once(agent-start, this.forkAppWorkers.bind(this));

forkAppWorkers方法中使用cfork模塊,通過cluster.fork方式創建

forkAppWorkers() { cfork({ exec: appWorkerFile, args, silent: false, count: this.options.workers, // dont refork in local env refork: this.isProduction, });}

第四步:App Worker 初始化成功,通知 Master

master.js

cluster.on(listening, (worker, address) => { this.messenger.send({ action: app-start, data: { workerPid: worker.process.pid, address }, to: master, from: app, });});

第五步:所有的進程初始化成功後,Master 通知 Agent 和 Worker 應用啟動成功

master.js

this.ready(() => { this.isStarted = true; const stickyMsg = this.options.sticky ? with STICKY MODE! : ; this.logger.info([master] %s started on %s (%sms)%s, frameworkPkg.name, this[APP_ADDRESS], Date.now() - startTime, stickyMsg); const action = egg-ready; this.messenger.send({ action, to: parent, data: { port: this[REALPORT], address: this[APP_ADDRESS] } }); this.messenger.send({ action, to: app, data: this.options }); this.messenger.send({ action, to: agent, data: this.options });});

進程守護

針對未捕獲異常進程可以優雅退出,順序如下:

  1. 關閉異常 Worker 進程所有的 TCP Server(將已有的連接快速斷開,且不再接收新的連接),斷開和 Master 的 IPC 通道,不再接受新的用戶請求。
  2. Master 立刻 fork 一個新的 Worker 進程,保證在線的『工人』總數不變。
  3. 異常 Worker 等待一段時間,處理完已經接受的請求後退出。

第一步:

graceful/index.js

process.on(uncaughtException, function (err) { //... servers.forEach(function (server) { if (server instanceof http.Server) { server.on(request, function (req, res) { // Let http server set `Connection: close` header, and close the current request socket. req.shouldKeepAlive = false; res.shouldKeepAlive = false; if (!res._header) { res.setHeader(Connection, close); } }); } }); //... // 斷開和 Master 的 IPC 通道,不再接受新的用戶請求 // cluster mode if (worker) { try { // stop taking new requests. // because server could already closed, need try catch the error: `Error: Not running` for (var i = 0; i < servers.length; i++) { var server = servers[i]; server.close(); console.error([%s] [graceful:worker:%s] close server#%s, _connections: %s, Date(), process.pid, i, server._connections); } console.error([%s] [graceful:worker:%s] close %d servers!, Date(), process.pid, servers.length); } catch (er1) { // Usually, this error throw cause by the active connections after the first domain error, // oh well, not much we can do at this point. console.error([%s] [graceful:worker:%s] Error on server close!
%s
, Date(), process.pid, er1.stack); } try { // Let the master know were dead. This will trigger a // disconnect in the cluster master, and then it will fork // a new worker. worker.send(graceful:disconnect); worker.disconnect(); console.error([%s] [graceful:worker:%s] worker disconnect!, Date(), process.pid); } catch (er2) { // Usually, this error throw cause by the active connections after the first domain error, // oh well, not much we can do at this point. console.error([%s] [graceful:worker:%s] Error on worker disconnect!
%s
, Date(), process.pid, er2.stack); } }});

第二步:Master 立刻 fork 一個新的 Worker 進程,cfork

cfork/index.js

cluster.on(disconnect, function (worker) { //... disconnects[worker.process.pid] = utility.logDate(); if (allow()) { newWorker = forkWorker(worker._clusterSettings); newWorker._clusterSettings = worker._clusterSettings; log([%s] [cfork:master:%s] new worker:%s fork (state: %s), utility.logDate(), process.pid, newWorker.process.pid, newWorker.state); } else { log([%s] [cfork:master:%s] don fork new work (refork: %s), utility.logDate(), process.pid, refork); }});

第三步:異常 Worker 等待一段時間,處理完已經接受的請求後退出

graceful/index.js

// make sure we close down within `killTimeout` secondsvar killtimer = setTimeout(function () { console.error([%s] [graceful:worker:%s] kill timeout, exit now., Date(), process.pid); if (process.env.NODE_ENV !== test) { process.exit(1); }}, killTimeout);console.error([%s] [graceful:worker:%s] will exit after %dms, Date(), process.pid, killTimeout);

當OOM、系統異常時,不像未捕獲異常發生時我們還有機會讓進程繼續執行,只能夠讓當前進程直接退出,Master 立刻 fork 一個新的 Worker

cfork/index.js

cluster.on(exit, function (worker, code, signal) { var isExpected = !!disconnects[worker.process.pid]; var isDead = worker.isDead && worker.isDead(); var propertyName = worker.hasOwnProperty(exitedAfterDisconnect) ? exitedAfterDisconnect : suicide; console.error([%s] [cfork:master:%s] worker:%s exit (code: %s, %s: %s, state: %s, isDead: %s, isExpected: %s, worker.disableRefork: %s), utility.logDate(), process.pid, worker.process.pid, code, propertyName, worker[propertyName], worker.state, isDead, isExpected, worker.disableRefork); if (isExpected) { delete disconnects[worker.process.pid]; // worker disconnect first, exit expected return; } if (worker.disableRefork) { // worker is killed by master return; } unexpectedCount++; if (allow()) { newWorker = forkWorker(worker._clusterSettings); newWorker._clusterSettings = worker._clusterSettings; console.error([%s] [cfork:master:%s] new worker:%s fork (state: %s), utility.logDate(), process.pid, newWorker.process.pid, newWorker.state); } else { console.error([%s] [cfork:master:%s] don fork new work (refork: %s), utility.logDate(), process.pid, refork); } cluster.emit(unexpectedExit, worker, code, signal); });

進程間通訊(IPC)

進程間的通信模型:

廣播消息: agent => all workers +--------+ +-------+ | Master |<---------| Agent | +--------+ +-------+ / | / | / | / | v v v +----------+ +----------+ +----------+ | Worker 1 | | Worker 2 | | Worker 3 | +----------+ +----------+ +----------+指定接收方: one worker => another worker +--------+ +-------+ | Master |----------| Agent | +--------+ +-------+ ^ | send to / | worker 2 / | / | / v +----------+ +----------+ +----------+ | Worker 1 | | Worker 2 | | Worker 3 | +----------+ +----------+ +----------+

舉例:通知所有app workers進行某個操作

app/controller/home.js,觸發refresh事件,參數為pull,接著調用messenger.js中的sendToApp方法

ctx.app.messenger.sendToApp(refresh, pull);

egg/lib/core/messenger.js,實際調用sendmessage

/** * send message to app * @param {String} action - message key * @param {Object} data - message value * @return {Messenger} this */ sendToApp(action, data) { debug([%s] send %s with %j to all app, this.pid, action, data); this.send(action, data, app); return this; } /** * @param {String} action - message key * @param {Object} data - message value * @param {String} to - let master know how to send message * @return {Messenger} this */ send(action, data, to) { sendmessage(process, { action, data, to, }); return this; }

sendmessage/index.js,此處child值為process,經過一系列判斷之後,執行child.send,子進程發送消息,觸發master.js中的message事件

module.exports = function send(child, message) { if (typeof child.send !== function) { // not a child process return setImmediate(child.emit.bind(child, message, message)); } if (IS_NODE_DEV_RUNNER || process.env.SENDMESSAGE_ONE_PROCESS) { // run with node-dev, only one process // https://github.com/node-modules/sendmessage/issues/1 return setImmediate(child.emit.bind(child, message, message)); } // cluster.fork(): child.process is process // childprocess.fork(): child is process var connected = child.process ? child.process.connected : child.connected; if (connected) { return child.send(message); } // just log warnning message var pid = child.process ? child.process.pid : child.pid; var err = new Error(channel closed); console.warn([%s][sendmessage] WARN pid#%s channel closed, nothing send
stack: %s
, Date(), pid, err.stack);};

egg-cluster/lib/master.js,執行this.messenger.send,接著會調用utils/messenger.js中的send方法

forkAppWorkers() { this.appStartTime = Date.now(); this.isAllAppWorkerStarted = false; this.startSuccessCount = 0; this.workers = new Map(); const args = [ JSON.stringify(this.options) ]; this.log([master] start appWorker with args %j, args); cfork({ exec: appWorkerFile, args, silent: false, count: this.options.workers, // dont refork in local env refork: this.isProduction, }); let debugPort = process.debugPort; cluster.on(fork, worker => { worker.disableRefork = true; this.workers.set(worker.process.pid, worker); worker.on(message, msg => { if (typeof msg === string) msg = { action: msg, data: msg }; msg.from = app; this.messenger.send(msg); }); this.log([master] app_worker#%s:%s start, state: %s, current workers: %j, worker.id, worker.process.pid, worker.state, Object.keys(cluster.workers)); // send debug message, due to `brk` scence, send here instead of app_worker.js if (this.options.isDebug) { debugPort++; this.messenger.send({ to: parent, from: app, action: debug, data: { debugPort, pid: worker.process.pid } }); } }); //省略... }

egg-cluster/lib/utils/messenger.jsxian,首先調用send方法,經過判斷之後執行sendToAppWorker方法,繼續調用sendmessage方法,此處的child為子進程worker,當worker.send執行時,會觸發process的message事件

class Messenger { constructor(master) { this.master = master; process.on(message, msg => { msg.from = parent; this.send(msg); }); } /** * send message * @param {Object} data message body * - {String} from from who * - {String} to to who */ send(data) { if (!data.from) { data.from = master; } // default from -> to rules if (!data.to) { if (data.from === agent) data.to = app; if (data.from === app) data.to = agent; if (data.from === parent) data.to = master; } // app -> master // agent -> master if (data.to === master) { debug(%s -> master, data: %j, data.from, data); // app/agent to master this.sendToMaster(data); return; } // master -> parent // app -> parent // agent -> parent if (data.to === parent) { debug(%s -> parent, data: %j, data.from, data); this.sendToParent(data); return; } // parent -> master -> app // agent -> master -> app if (data.to === app) { debug(%s -> %s, data: %j, data.from, data.to, data); this.sendToAppWorker(data); return; } // parent -> master -> agent // app -> master -> agent,可能不指定 to if (data.to === agent) { debug(%s -> %s, data: %j, data.from, data.to, data); this.sendToAgentWorker(data); return; } } /** * send message to master self * @param {Object} data message body */ sendToMaster(data) { this.master.emit(data.action, data.data); } /** * send message to parent process * @param {Object} data message body */ sendToParent(data) { process.send && process.send(data); } /** * send message to app worker * @param {Object} data message body */ sendToAppWorker(data) { for (const id in cluster.workers) { const worker = cluster.workers[id]; if (worker.state === disconnected) { continue; } // check receiverPid if (data.receiverPid && data.receiverPid !== String(worker.process.pid)) { continue; } sendmessage(worker, data); } } /** * send message to agent worker * @param {Object} data message body */ sendToAgentWorker(data) { if (this.master.agentWorker) { sendmessage(this.master.agentWorker, data); } }}

egg/lib/core/messenger.js,執行this._onMessage,this.emit觸發action事件,同時傳遞參數

constructor() { super(); this.pid = String(process.pid); // pids of agent or app maneged by master // - retrieve app worker pids when its an agent worker // - retrieve agent worker pids when its an app worker this.opids = []; this.on(egg-pids, pids => { this.opids = pids; }); this._onMessage = this._onMessage.bind(this); process.on(message, this._onMessage); } _onMessage(message) { if (message && is.string(message.action)) { debug([%s] got message %s with %j, receiverPid: %s, this.pid, message.action, message.data, message.receiverPid); this.emit(message.action, message.data); } }

app.js,監聽refresh事件

app.messenger.on(refresh, by => { app.logger.info(start update by %s, by);});

以上步驟完成整個進程之間的一次通信。

具體流程可參考下圖

IPC調用關係


推薦閱讀:

如何理解 Ryan Dahl 最近專訪中的言論「Node 也許不是構建大型服務的最佳選擇」?
php 和nodejs 的各自優勢有哪些,如果無基礎開始,要學那個好?
node-webkit有無辦法調用C#的庫?
為什麼nodejs不給每一個.js文件以獨立的上下文來避免作用域被污染?
Object.create Reflect.setPrototypeOf 哪個比較好?

TAG:Nodejs |