Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to analyze threads with nodejs source code

2025-04-04 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

This article will explain in detail how to use nodejs source code to analyze threads, the content of the article is of high quality, so the editor will share it for you as a reference. I hope you will have a certain understanding of the relevant knowledge after reading this article.

Let's take a look at a general example of usage.

Const {Worker, isMainThread, parentPort} = require ('worker_threads')

If (isMainThread) {

Const worker = new Worker (_ _ filename)

Worker.once ('message', (message) = > {

...

});

Worker.postMessage ('Hello, Worldwide')

} else {

/ / do something time-consuming

ParentPort.once ('message', (message) = > {

ParentPort.postMessage (message)

});

}

Let's analyze the meaning of this code first. Because the above code is executed once in both the main thread and the child thread. So first of all, through the isMainThread to determine whether the current main thread or child thread. In the case of the main thread, create a child thread and then listen for messages sent by the child thread. If the child thread, first execute the business-related code, but also can listen to the message passed by the main thread. Let's start to analyze the source code. After the analysis, there will be more understanding of the above code.

First of all, let's start with the worker_threads module. This is a C++ module. Let's take a look at the functions he exported. Require ("work_threads") refers to the export function of the InitWorker function.

Void InitWorker (Local target

Local unused

Local context

Void* priv) {

Environment* env = Environment::GetCurrent (context)

{

/ / when the following method is executed, the input parameters are all objects from w-> GetFunction () new.

/ / create a new function template. Worker::New is the callback that is executed when new is executed on w-> GetFunction ().

Local w = env- > NewFunctionTemplate (Worker::New)

/ / set the memory that needs to be expanded, because the memory of C++ object is fixed

W-> InstanceTemplate ()-> SetInternalFieldCount (1)

W-> Inherit (AsyncWrap::GetConstructorTemplate (env))

/ / if you set a series of prototype methods, you will not enumerate them all.

Env- > SetProtoMethod (w, "setEnvVars", Worker::SetEnvVars)

/ / A series of prototype methods

/ / Export the function corresponding to the function module, that is, the Worker in our code const {Worker} = require ("worker_threads");

Local workerString =

FIXED_ONE_BYTE_STRING (env- > isolate (), "Worker")

W-> SetClassName (workerString)

Target- > Set (env- > context ()

WorkerString

W-> GetFunction (env- > context ()). ToLocalChecked (). Check ()

}

/ / Export getEnvMessagePort method, const {getEnvMessagePort} = require ("worker_threads")

Env- > SetMethod (target, "getEnvMessagePort", GetEnvMessagePort)

/ *

Thread id, which is not assigned by the operating system, but assigned by nodejs, is set when a new thread is opened

Const {threadId} = require ("worker_threads")

, /

Target

-> Set (env- > context ()

Env- > thread_id_string ()

Number::New (env- > isolate (), static_cast (env- > thread_id ()

.Check ()

/ *

Whether it is the main thread, const {isMainThread} = require ("worker_threads")

The variable here is set to true when nodejs starts, but not set when a new child thread is opened, so it is false.

, /

Target

-> Set (env- > context ()

FIXED_ONE_BYTE_STRING (env- > isolate (), "isMainThread")

Boolean::New (env- > isolate (), env- > is_main_thread ())

.Check ()

/ *

If it is not the main thread, export the configuration of resource limits

That is, to call const {resourceLimits} = require ("worker_threads") in the child thread

, /

If (! env- > is_main_thread ()) {

Target

-> Set (env- > context ()

FIXED_ONE_BYTE_STRING (env- > isolate (), "resourceLimits")

Env- > worker_context ()-> GetResourceLimits (env- > isolate ())

.Check ()

}

/ / Export several constants

NODE_DEFINE_CONSTANT (target, kMaxYoungGenerationSizeMb)

NODE_DEFINE_CONSTANT (target, kMaxOldGenerationSizeMb)

NODE_DEFINE_CONSTANT (target, kCodeRangeSizeMb)

NODE_DEFINE_CONSTANT (target, kTotalResourceLimitCount)

}

Translated into js is probably

Function c++Worker (object) {

/ / related, later, when the C++ layer function is called in the js layer, take it out and get the real worker object in the C++ layer

Object [0] = this

...

}

Function New (object) {

Const worker = new c++Worker (object)

}

Function Worker () {

New (this)

}

Worker.prototype = {

StartThread,StartThread

StopThread: StopThread

...

}

Module.exports = {

Worker: Worker

GetEnvMessagePort: GetEnvMessagePort

IsMainThread: true | false

...

}

After understanding the export capabilities of the work_threads module, we look at the logic of new Worker. According to the logic derived from the above code, we know that a new C++ object will be created first at this time. Corresponds to the this in the Worker function above. Then execute the New callback and pass in the tihs. Let's look at the logic of the New function. We omit a series of parameter handling, the main code is as follows.

/ / args.This () is the this we just passed in.

Worker* worker = new Worker (env, args.This ()

Url, per_isolate_opts

Std::move (exec_argv_out))

Let's take a look at the Worker class.

Worker::Worker (Environment* env

Local wrap,...)

/ / complete the association of the object's Worker object and args.This () object in the parent class constructor

: AsyncWrap (env, wrap, AsyncWrap::PROVIDER_WORKER)

...

/ / assign thread id

Thread_id_ (Environment::AllocateThreadId ())

Env_vars_ (env- > env_vars ()) {

/ / create a new port to communicate with child threads

Parent_port_ = MessagePort::New (env, env- > context ())

/ *

To associate with, for communication.

Const parent_port_ = {data: {sibling: null}}

Const child_port_data_ = {sibling: null}

Parent_port_.data.sibling = child_port_data_

Child_port_data_.sibling = parent_port_.data

, /

Child_port_data_ = std::make_unique (nullptr)

MessagePort::Entangle (parent_port_, child_port_data_.get ())

/ / set the messagePort property of the Worker object to parent_port_

Object ()-> Set (env- > context ()

Env- > message_port_string ()

Parent_port_- > object (). Check ()

/ / set the thread id of the Worker object, that is, the threadId property

Object ()-> Set (env- > context ()

Env- > thread_id_string ()

Number::New (env- > isolate (), static_cast (thread_id_)

.Check ()

}

Create a new Worker with the following structure

Now that we understand the logic of new Worker, we look at how it is used in the js layer. Let's look at the constructor of the Worker class in the js layer. Constructor (filename, options = {}) {

Super ()

/ / ignore a series of parameter handling. New Worker is the one of C++ layer mentioned above.

This [kHandle] = new Worker (url, options.execArgv, parseResourceLimits (options.resourceLimits))

/ / messagePort is the messagePort in the figure above, pointing to _ parent_port

This [kPort] = This [kHandle] .messagePort

This [KPort] .on ('message', (data) = > this [kOnMessage] (data))

/ / start to receive messages. We will not go deep into messagePort here. We will analyze them separately later.

This [kPort] .start ()

/ / apply for a communication channel with two ports

Const {port1, port2} = new MessageChannel ()

This [kPublicPort] = port1

Th [kPublicPort] .on ('message', (message) = > this.emit (' message', message))

/ / send a message to the other end

This [kPort] .postMessage ({

Argv

Type: messageTypes.LOAD_SCRIPT

Filename

DoEval:!! options.eval

CwdCounter: cwdCounter | | workerIo.sharedCwdCounter

WorkerData: options.workerData

PublicPort: port2

ManifestSrc: getOptionValue ('--experimental-policy')?

Require ('internal/process/policy') .src:

Null

HasStdin:!! options.stdin

}, [port2])

/ / start the thread

This [kHandle] .startThread ()

}

The main logic of the above code is as follows

1 save the messagePort, and then send the message to the peer of the messagePort (see the figure above), but there is no receiver at this time, so the message is cached in the MessagePortData, that is, child_port_data_.

2 apply for a communication pipeline for communication between main thread and child thread. _ parent_port and child_port are for nodejs, and the newly applied pipeline is for users.

3 create child threads.

We look at what we did when we created the thread.

Void Worker::StartThread (const FunctionCallbackInfo& args) {

Worker* w

/ / unpack the corresponding Worker object

ASSIGN_OR_RETURN_UNWRAP (& w, args.This ())

/ / create a new child thread, then execute the Run function, and then execute it in the child thread

Uv_thread_create_ex (& w-> tid_, & thread_options, [] (void* arg) {

W-> Run ()

}, static_cast (w))

}

Let's keep watching Run.

Void Worker::Run () {

{

/ / create a new env

Env_.reset (new Environment (data.isolate_data_.get))

Context

Std::move (argv_)

Std::move (exec_argv_)

Environment::kNoFlags

Thread_id_))

/ / initialize libuv and register with libuv

Env_- > InitializeLibuv (start_profiler_idle_notifier_)

/ / create a MessagePort

CreateEnvMessagePort (env_.get ())

/ / execute internal/main/worker_thread.js

StartExecution (env_.get (), "internal/main/worker_thread")

/ / start the event loop

Do {

Uv_run & data.loop_, UV_RUN_DEFAULT)

Platform_- > DrainTasks (isolate_)

More = uv_loop_alive (& data.loop_)

If (more & &! is_stopped ()) continue

More = uv_loop_alive (& data.loop_)

} while (more = = true & &! is_stopped ())

}

}

Let's analyze the above code step by step

1 CreateEnvMessagePort

Void Worker::CreateEnvMessagePort (Environment* env) {

Child_port_ = MessagePort::New (env

Env- > context ()

Std::move (child_port_data_))

If (child_port_! = nullptr)

Env- > set_message_port (child_port_- > object (isolate_))

}

The variable child_port_data_ should be familiar to us, so we should first apply for a new port here. The object responsible for data management in the port is child_port_data_. And then cached in env. I'll use it later.

2 execute internal/main/worker_thread.js// to set process object

PatchProcessObject ()

/ / get the port that was just cached

Onst port = getEnvMessagePort ()

Port.on ('message', (message) = > {

/ / load script

If (message.type = LOAD_SCRIPT) {

Const {

Argv

CwdCounter

Filename

DoEval

WorkerData

PublicPort

ManifestSrc

ManifestURL

HasStdin

} = message

Const CJSLoader = require ('internal/modules/cjs/loader')

LoadPreloadModules ()

/ *

The port at one end of the MessageChannel pipe requested by the main thread

Set the parentPort field of publicWorker. PublicWorker is the object exported by worker_threads. Later, you need to use

, /

PublicWorker.parentPort = publicPort

/ / data used during execution

PublicWorker.workerData = workerData

/ / notify the main thread that the script is being executed

Port.postMessage ({type: UP_AND_RUNNING})

/ / the file passed in when new Worker (filename) is executed

CJSLoader.Module.runMain (filename)

})

/ / start receiving messages

Port.start ()

At this point, let's look back at the scenario when we call new Worker (filename) and then execute our filename in the child thread. Let's review the previous code again.

Const {Worker, isMainThread, parentPort} = require ('worker_threads')

If (isMainThread) {

Const worker = new Worker (_ _ filename)

Worker.once ('message', (message) = > {

...

});

Worker.postMessage ('Hello, Worldwide')

} else {

/ / do something time-consuming

ParentPort.once ('message', (message) = > {

ParentPort.postMessage (message)

});

}

We know that isMainThread is false,parentPort or one end of messageChannel in a subthread. So when parentPort.postMessage sends a message to the peer, it sends a message to the main thread. Let's take a look at worker.postMessage ('Hello, worldview').

PostMessage (. Args) {

This [kPublicPort] .postMessage (... args)

}

KPublicPort points to the other end of the messageChannel. That is, a message is sent to the child thread. Then the on ('message') receives the message sent by the peer.

On how to use the nodejs source code analysis thread to share here, I hope the above content can be of some help to you, can learn more knowledge. If you think the article is good, you can share it for more people to see.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report