Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Example Analysis of Multi-process Model in Node.js

2025-01-18 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/03 Report--

This article will explain in detail the example analysis of the multi-process model in Node.js. The editor thinks it is very practical, so I share it for you as a reference. I hope you can get something after reading this article.

Cluster module

Node.js provides a Cluster module to solve the above problems. Through this module, developers can create a cluster by creating a sub-process mode to make full use of the resources of the machine or container. At the same time, the module allows many sub-processes to listen to the same port.

Example const cluster = require ('cluster'); const http = require (' http'); const numCPUs = require ('os'). Cpus (). Length;if (cluster.isMaster) {/ / Fork workers. For (let I = 0; I

< numCPUs; i++) { cluster.fork(); } cluster.on('exit', function(worker, code, signal) { console.log('worker ' + worker.process.pid + ' died'); });} else { // Workers can share any TCP connection // In this case it is an HTTP server http.createServer(function(req, res) { res.writeHead(200); res.end("hello world\n"); }).listen(8000);}通过代码解析创建子进程的过程 首先从 const cluster = require('cluster') 说起,这行代码导入了 Node 的 Cluster 模块,而在 Node 内部,Master 进程与 Worker 进程引入的文件却不一样,详情见如下代码: 'use strict';const childOrPrimary = 'NODE_UNIQUE_ID' in process.env ? 'child' : 'master';module.exports = require(`internal/cluster/${childOrPrimary}`); 不同的文件意味着两种进程在执行中的表现也不一样,例如: // internal/cluster/master.jscluster.isWorker = false;cluster.isMaster = true;// internal/cluster/child.jscluster.isWorker = true;cluster.isMaster = false; 这也是为什么 Cluster 模块到处的变量能区分不同类型进程的原因,接下来让我们分别从主、子进程两个方向去了解具体的过程 主进程 在上述代码里,Master 进程并没有做太多事情,只是根据 CPU 数量去 fork 子进程,那么我们深入到源代码里大致来看一下,相关描述均在代码的注释内 // lib/internal/cluster/master.js// 初始化clusterconst cluster = new EventEmitter();// 创建监听地址与server对应的mapconst handles = new SafeMap();// 初始化cluster.isWorker = false;cluster.isMaster = true;cluster.workers = {};cluster.settings = {};cluster.SCHED_NONE = SCHED_NONE; // Leave it to the operating system.cluster.SCHED_RR = SCHED_RR; // Master distributes connections.// 自增的子进程idlet ids = 0;// 向cluster添加fork方法cluster.fork = function(env) { // 初始化cluster.settings cluster.setupMaster(); // 为当前fork的子进程生成当前cluster内的唯一id const id = ++ids; // 创建子进程 const workerProcess = createWorkerProcess(id, env); // 创建对应的worker实例 const worker = new Worker({ id: id, process: workerProcess }); // 省略一些worker的事件监听.... // 监听内部消息事件,并交由onmessage处理 worker.process.on('internalMessage', internal(worker, onmessage)); // cluster发出fork事件 process.nextTick(emitForkNT, worker); // 将worker实例放在cluster.workers中维护 cluster.workers[worker.id] = worker; // 返回worker return worker;};// 创建子进程函数function createWorkerProcess(id, env) { // 将主进程的env、调用cluster.fork时传入的env以及NODE_UNIQUE_ID env构建成一个env对象 const workerEnv = { ...process.env, ...env, NODE_UNIQUE_ID: `${id}` }; // 执行参数 const execArgv = [...cluster.settings.execArgv]; // 省略debug模式相关逻辑... // 调用child_process模块的fork函数创建子进程并返回,至此子进程实例创建完成 return fork(cluster.settings.exec, cluster.settings.args, { cwd: cluster.settings.cwd, env: workerEnv, serialization: cluster.settings.serialization, silent: cluster.settings.silent, windowsHide: cluster.settings.windowsHide, execArgv: execArgv, stdio: cluster.settings.stdio, gid: cluster.settings.gid, uid: cluster.settings.uid });}// 内部消息事件处理函数function onmessage(message, handle) { const worker = this; if (message.act === 'online') online(worker); // 当子进程向主进程发出queryServer消息后,执行queryServer函数,创建server else if (message.act === 'queryServer') queryServer(worker, message); else if (message.act === 'listening') listening(worker, message); else if (message.act === 'exitedAfterDisconnect') exitedAfterDisconnect(worker, message); else if (message.act === 'close') close(worker, message);}// 获取serverfunction queryServer(worker, message) { // Stop processing if worker already disconnecting if (worker.exitedAfterDisconnect) return; // 创建当前子进程监听地址信息的key const key = `${message.address}:${message.port}:${message.addressType}:` + `${message.fd}:${message.index}`; // 在handles map中查询是否有已经创建好的该监听地址的server let handle = handles.get(key); // 没有对应的server则进行创建 if (handle === undefined) { let address = message.address; // Find shortest path for unix sockets because of the ~100 byte limit if (message.port < 0 && typeof address === 'string' && process.platform !== 'win32') { address = path.relative(process.cwd(), address); if (message.address.length < address.length) address = message.address; } // 主、子进程处理连接的方式,默认为轮询 let constructor = RoundRobinHandle; // UDP is exempt from round-robin connection balancing for what should // be obvious reasons: it's connectionless. There is nothing to send to // the workers except raw datagrams and that's pointless. if (schedulingPolicy !== SCHED_RR || message.addressType === 'udp4' || message.addressType === 'udp6') { constructor = SharedHandle; } // 将监听地址信息传入构造函数创建监听实例 handle = new constructor(key, address, message); // 缓存监听实例 handles.set(key, handle); } // 向server添加自定义信息,用于server发出listening事件后透传到worker if (!handle.data) handle.data = message.data; // 添加server发出listening事件后的回调函数通知子进程 handle.add(worker, (errno, reply, handle) =>

{const {data} = handles.get (key); if (errno) handles.delete (key); / / Gives other workers a chance to retry. Send (worker, {errno, key, ack: message.seq, data,... reply}, handle);}) } / / lib/internal/cluster/round_robin_handle.js// constructor. The argument is the key,ip address corresponding to server (for http (s)), listening related information function RoundRobinHandle (key, address, {port, fd, flags}) {/ / initialize handle this.key = key; this.all = new SafeMap (); this.free = new SafeMap (); this.handles = []; this.handle = null This.server = net.createServer (assert.fail); / / listens to file descriptors without discussing if (fd > = 0) this.server.listen ({fd}); / / listens to ip:port else if (port > = 0) {this.server.listen ({port, host: address, / / Currently, net module only supports `ipv6Only` option in `rooms`. Ipv6Only: Boolean (flags & constants.UV_TCP_IPV6ONLY),}); / / listen to UNIX socket without discussing} else this.server.listen (address); / / UNIX socket path. / / register the callback function this.server.once of the listening event issued by server ('listening', () = > {this.handle = this.server._handle; this.handle.onconnection = (err, handle) = > this.distribute (err, handle); this.server._handle = null; this.server = null;}) } / / add the callback function RoundRobinHandle.prototype.add = function (worker, send) {assert (this.all.has (worker.id) = false) passed in master.js after worker,server issues the listening event; this.all.set (worker.id, worker); const done = () = > {if (this.handle.getsockname) {const out = {}; this.handle.getsockname (out); / / TODO (bnoordhuis) Check err. Send (null, {sockname: out}, null);} else {send (null, null, null); / / UNIX socket. } this.handoff (worker); / / In case there are connections pending. }; if (this.server = = null) return done (); / / Still busy binding. This.server.once ('listening', done); this.server.once (' error', (err) = > {send (err.errno, null);});}; / / delete worker, which is no longer assigned to this workerRoundRobinHandle.prototype.remove = function (worker) {const existed = this.all.delete (worker.id); if (! existed) return false; this.free.delete (worker.id); if (this.all.size! = = 0) return false For (const handle of this.handles) {handle.close ();} this.handles = []; this.handle.close (); this.handle = null; return true;}; / / polling scheduling function RoundRobinHandle.prototype.distribute = function (err, handle) {ArrayPrototypePush (this.handles, handle); const [workerEntry] = this.free; / / this.free is a SafeMap if (ArrayIsArray (workerEntry)) {const {0: workerId, 1: worker} = workerEntry This.free.delete (workerId); this.handoff (worker);}}; / / hand over handle to workerRoundRobinHandle.prototype.handoff = function (worker) {if (! this.all.has (worker.id)) {return; / / Worker is closing (or has closed) the server. } const handle = ArrayPrototypeShift (this.handles); if (handle = undefined) {this.free.set (worker.id, worker); / / Add to ready queue again. Return;} / / issues newconn events to the worker const message = {act: 'newconn', key: this.key}; sendHelper (worker.process, message, handle, (reply) = > {if (reply.accepted) handle.close (); else this.distribute (0, handle); / / Worker is shutting down. Send to another. This.handoff (worker);});}; child process

In each child process, we create a HTTP Server, then execute the listen function to listen to port 8000, and the HTTP Server instance is inherited by the Net Server prototype chain. The listen function is the listen function on the Net Server prototype, as shown below:

/ / lib/_http_server.jsfunction Server (options, requestListener) {....} ObjectSetPrototypeOf (Server.prototype, net.Server.prototype); ObjectSetPrototypeOf (Server, net.Server) / / lib/net.jsServer.prototype.listen = function (... args) {/ / some parameters nomolize and other listening processing are omitted due to space / / after this logic, the listenInCluster function is called to the real listening port if (typeof options.port = = 'number' | | typeof options.port = =' string') {validatePort (options.port, 'options.port'); backlog = options.backlog | | backlogFromArgs / / start TCP server listening on host:port if (options.host) {lookupAndListen (this, options.port | 0, options.host, backlog, options.exclusive, flags);} else {/ / Undefined host, listens on unspecified address / / Default addressType 4 will be used to search for master server listenInCluster (this, null, options.port | 0,4, backlog, undefined, options.exclusive) } return this;} / / omit.}; / / Cluster listening function function listenInCluster (server, address, port, addressType, backlog, fd, exclusive, flags) {exclusive =!! exclusive; if (cluster = undefined) cluster = require ('cluster') / / determine whether it is master. Cluster.isMasterdefaults to true in a single process, then listens and returns if (cluster.isMaster | | exclusive) {/ / Will create a new handle / / _ listen2 sets up the listened handle, it is still named like this / / to avoid breaking code that wraps this method server._listen2 (address, port, addressType, backlog, fd, flags); return } / / in a child process, the listener address information is passed into the _ getServer function in the cluster instance to get a faux handle const serverQuery = {address: address, port: port, addressType: addressType, fd: fd, flags,}; / / Get the master's server handle, and listen on it cluster._getServer (server, serverQuery, listenOnMasterHandle) / / get the net server callback function. After getting the faux handle, call the _ listen2 function, that is, the setupListenHandle function function listenOnMasterHandle (err, handle) {err = checkBindError (err, port, handle); if (err) {const ex = exceptionWithHostPort (err, 'bind', address, port); return server.emit (' error', ex);} / / Reuse master's server handle server._handle = handle / _ listen2 sets up the listened handle, it is still named like this / / to avoid breaking code that wraps this method server._listen2 (address, port, addressType, backlog, fd, flags);}} / enable snooping handlefunction setupListenHandle (address, port, addressType, backlog, fd, flags) {debug ('setupListenHandle', address, port, addressType, backlog, fd) / / as stated in the English comments, if there is no listening handle, create it. If there is a listening handle, skip / / If there is not yet a handle, we need to create one and bind. / / In the case of a server sent via IPC, we don't need to do this. If (this._handle) {debug ('setupListenHandle: have a handle already');} else {debug (' setupListenHandle: create a handle'); let rval = null; / / the code to create the listening handle. This._handle = rval;} / / set the onconnection function on the faux handle set on this to listen for connection entry this._handle.onconnection = onconnection;}

At the same time, at the beginning of parsing, we said that when the Cluster module is introduced, it will determine whether it is a child process according to whether the env of the current process contains NODE_UNIQUE_ID, and if so, execute the child.js file

If the value of message.cmd sent in Tips:IPC communication is prefixed with NODE, it will respond to an internal event internalMessage

/ / lib/internal/cluster/child.js// initialization const cluster = new EventEmitter (); / / store the generated faux handleconst handles = new SafeMap (); / / store the corresponding relationship between the listening address and the listening address index const indexes = new SafeMap (); cluster.isWorker = true;cluster.isMaster = false;cluster.worker = null;cluster.Worker = Worker / / this function is executed when the child process starts and initializes. After the execution is completed, the NODE_UNIQUE_ID environment variable / / in env is deleted. For more information, please see the initializeClusterIPC function cluster._setupWorker = function () {/ / initialize the worker instance const worker = new Worker ({id: + process.env.NODE_UNIQUE_ID | 0, process: process, state: 'online'}) in lib/internal/bootstrap/pre_excution.js. Cluster.worker = worker; / / handles disconnect events process.once ('disconnect', () = > {worker.emit (' disconnect'); if (! worker.exitedAfterDisconnect) {/ / Unexpected disconnect, master exited, or some such nastiness, so / / worker exits immediately. Process.exit (0);}}); / / IPC intercom event listening process.on ('internalMessage', internal (worker, onmessage)); send ({act:' online'}); function onmessage (message, handle) {/ / if it is a new connection, the resulting handle is passed to the HTTP Server if (message.act = = 'newconn') onconnection (message, handle) started in the child process by executing the onconnection function Else if (message.act = 'disconnect') ReflectApply (_ disconnect, worker, [true]);}}; / / add the get server function, which will be executed / / `obj` is a net#Server or a dgram#Socket object.cluster._getServer = function (obj, options, cb) {let address = options.address; / / Resolve unix socket paths to absolute paths if (options.port) when the net server listens on the port

< 0 && typeof address === 'string' && process.platform !== 'win32') address = path.resolve(address); // 生成地址信息的的key const indexesKey = ArrayPrototypeJoin( [ address, options.port, options.addressType, options.fd, ], ':'); // 检查是否缓存了indexedKey,如果没有,则表明是新的监听地址,在 master.js 中会生成新的net server let index = indexes.get(indexesKey); if (index === undefined) index = 0; else index++; // 设置 indexesKey 与 index的对应关系 indexes.set(indexesKey, index); // 传递地址信息及index const message = { act: 'queryServer', index, data: null, ...options }; message.address = address; // Set custom data on handle (i.e. tls tickets key) if (obj._getServerData) message.data = obj._getServerData(); // 向主进程发送queryServer消息 send(message, (reply, handle) =>

{if (typeof obj._setServerData = 'function') obj._setServerData (reply.data); / / execute the corresponding load balancer code according to the processing of adding worker to the corresponding load balancer handle, and execute the cb function / / polling without passing handle, and the corresponding code is if (handle) shared (reply, handle, indexesKey, cb) in RoundRobinHandle.prototype.add; / / Shared listen socket. Else rr (reply, indexesKey, cb); / / Round-robin. }); obj.once ('listening', () = > {cluster.worker.state =' listening'; const address = obj.address (); message.act = 'listening'; message.port = (address & & address.port) | | options.port; send (message);});}; / / create faux handle and save its correspondence / / Round-robin. Master distributes handles across workers.function rr (message, indexesKey, cb) {if (message.errno) return cb (message.errno, null); let key = message.key; function listen (backlog) {/ / TODO (bnoordhuis) Send a message to the master that tells it to / / update the backlog size. The actual backlog should probably be / / the largest requested size by any worker. Return 0;} function close () {/ / lib/net.js treats server._handle.close () as effectively synchronous. / / That means there is a time window between the call to close () and / / the ack by the master process in which we can still receive handles. / / onconnection () below handles that by sending those handles back to / / the master. If (key = undefined) return; send ({act: 'close', key}); handles.delete (key); indexes.delete (indexesKey); key = undefined;} function getsockname (out) {if (key) ObjectAssign (out, message.sockname); return 0;} / / create Faux handle / / Faux handle. Mimics a TCPWrap with just enough fidelity to get away / / with it. Fools net.Server into thinking that it's backed by a real / / handle. Use a noop function for ref () and unref () because the control / / channel is going to keep the worker alive anyway. Const handle = {close, listen, ref: noop, unref: noop}; if (message.sockname) {handle.getsockname = getsockname; / / TCP handles only. } assert (handles.has (key) = false); / / Save faux handle handles.set (key, handle); / / execute the callback function cb (0, handle) passed through the cluster._getServer function called by the net module;} / / process the request / / Round-robin connection.function onconnection (message, handle) {/ / get the key const key of faux handle = message.key; / / get faux hadle const server = handles.get (key); const accepted = server! = = undefined Send ({ack: message.seq, accepted}); / / call the connection handler set for the faux handle in the setupListenHandle function in the net module to process the request if (accepted) server.onconnection (0, handle);}

At this point, all the contents are linked.

Why can multiple child processes listen on the same port

In the previous code analysis, we know that the Cluster cluster will create a Net Server in the Master process. When the Worker process runs to create a HTTP Server, it will pass the listening address information into the cluster._getServer function to create a faux handle and set it to the Net Server of the child process. When the Worker process initializes, it will register the IPC communication callback function, which is in the callback function. Call the {faux handle} .onconnection function initialized by the Net Server module in the child process, and pass in the handle of the passed connection to complete the request response.

How to ensure the robustness of cluster work

We can listen to the error, disconntect, exit events of the Worker process in the Master process, and do the corresponding processing in these events, such as cleaning up the exited process and re-fork, or using the encapsulated npm package, such as cfork

Egg.js multi-process model

In Egg.js 's multi-process model, another process type is added, that is, Agent process, which is mainly used to deal with things that are difficult for multiple processes to handle and to reduce the number of long links. The specific relationship is as follows:

+-+ | Master | | Agent | | Worker | +-+-fork agent | | +-> | agent ready | worker ready | | Egg ready | | +- -- > |

In the egg-cluster package, the cfork package is used to ensure that the Worker process restarts automatically when it dies.

Problem record

In one of our Egg applications, the log system does not use Egg native logs, but uses an internal log library based on log4js packages. When using it, extend the Logger you need to use to the Application object. In this way, each Worker process will create a new Logger during initialization, that is, there will be the problem of multi-process logging, but there is no error problem of multi-process logging.

In the process of tracing the source code, it is found that although log4js provides Cluster mode, the Cluster mode of log4js is not enabled in the upper encapsulation, so the appender of each Logger uses flag a to open a write stream, and there is no answer here.

Later, the answer was found in CNode. The libuv file pool implementation corresponding to the writable stream opened with flag a under unix is UV_FS_O_APPEND, that is, O_APPEND, and O_APPEND itself is defined as an atomic operation in the man manual. The kernel ensures that concurrent writing to this writable stream is safe without adding locks on the application layer (except for file information loss or corruption caused by concurrent writes on NFS file systems). The network-mounted file system of the NFS class mainly simulates the local operations of the class by simulating the underlying api, so it is obviously impossible to restore this kind of atomic operation api perfectly under competitive conditions, so you can't do this if you want to write your log to something similar to the local oss cloud disk mount. If you write multiple processes, you must manually add locks in the application layer.

This is the end of this article on "sample analysis of multi-process models in Node.js". I hope the above content can be helpful to you, so that you can learn more knowledge. if you think the article is good, please share it for more people to see.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report