In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-04 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
This article will explain in detail how to use nodejs source code to analyze threads, the content of the article is of high quality, so the editor will share it for you as a reference. I hope you will have a certain understanding of the relevant knowledge after reading this article.
Let's take a look at a general example of usage.
Const {Worker, isMainThread, parentPort} = require ('worker_threads')
If (isMainThread) {
Const worker = new Worker (_ _ filename)
Worker.once ('message', (message) = > {
...
});
Worker.postMessage ('Hello, Worldwide')
} else {
/ / do something time-consuming
ParentPort.once ('message', (message) = > {
ParentPort.postMessage (message)
});
}
Let's analyze the meaning of this code first. Because the above code is executed once in both the main thread and the child thread. So first of all, through the isMainThread to determine whether the current main thread or child thread. In the case of the main thread, create a child thread and then listen for messages sent by the child thread. If the child thread, first execute the business-related code, but also can listen to the message passed by the main thread. Let's start to analyze the source code. After the analysis, there will be more understanding of the above code.
First of all, let's start with the worker_threads module. This is a C++ module. Let's take a look at the functions he exported. Require ("work_threads") refers to the export function of the InitWorker function.
Void InitWorker (Local target
Local unused
Local context
Void* priv) {
Environment* env = Environment::GetCurrent (context)
{
/ / when the following method is executed, the input parameters are all objects from w-> GetFunction () new.
/ / create a new function template. Worker::New is the callback that is executed when new is executed on w-> GetFunction ().
Local w = env- > NewFunctionTemplate (Worker::New)
/ / set the memory that needs to be expanded, because the memory of C++ object is fixed
W-> InstanceTemplate ()-> SetInternalFieldCount (1)
W-> Inherit (AsyncWrap::GetConstructorTemplate (env))
/ / if you set a series of prototype methods, you will not enumerate them all.
Env- > SetProtoMethod (w, "setEnvVars", Worker::SetEnvVars)
/ / A series of prototype methods
/ / Export the function corresponding to the function module, that is, the Worker in our code const {Worker} = require ("worker_threads");
Local workerString =
FIXED_ONE_BYTE_STRING (env- > isolate (), "Worker")
W-> SetClassName (workerString)
Target- > Set (env- > context ()
WorkerString
W-> GetFunction (env- > context ()). ToLocalChecked (). Check ()
}
/ / Export getEnvMessagePort method, const {getEnvMessagePort} = require ("worker_threads")
Env- > SetMethod (target, "getEnvMessagePort", GetEnvMessagePort)
/ *
Thread id, which is not assigned by the operating system, but assigned by nodejs, is set when a new thread is opened
Const {threadId} = require ("worker_threads")
, /
Target
-> Set (env- > context ()
Env- > thread_id_string ()
Number::New (env- > isolate (), static_cast (env- > thread_id ()
.Check ()
/ *
Whether it is the main thread, const {isMainThread} = require ("worker_threads")
The variable here is set to true when nodejs starts, but not set when a new child thread is opened, so it is false.
, /
Target
-> Set (env- > context ()
FIXED_ONE_BYTE_STRING (env- > isolate (), "isMainThread")
Boolean::New (env- > isolate (), env- > is_main_thread ())
.Check ()
/ *
If it is not the main thread, export the configuration of resource limits
That is, to call const {resourceLimits} = require ("worker_threads") in the child thread
, /
If (! env- > is_main_thread ()) {
Target
-> Set (env- > context ()
FIXED_ONE_BYTE_STRING (env- > isolate (), "resourceLimits")
Env- > worker_context ()-> GetResourceLimits (env- > isolate ())
.Check ()
}
/ / Export several constants
NODE_DEFINE_CONSTANT (target, kMaxYoungGenerationSizeMb)
NODE_DEFINE_CONSTANT (target, kMaxOldGenerationSizeMb)
NODE_DEFINE_CONSTANT (target, kCodeRangeSizeMb)
NODE_DEFINE_CONSTANT (target, kTotalResourceLimitCount)
}
Translated into js is probably
Function c++Worker (object) {
/ / related, later, when the C++ layer function is called in the js layer, take it out and get the real worker object in the C++ layer
Object [0] = this
...
}
Function New (object) {
Const worker = new c++Worker (object)
}
Function Worker () {
New (this)
}
Worker.prototype = {
StartThread,StartThread
StopThread: StopThread
...
}
Module.exports = {
Worker: Worker
GetEnvMessagePort: GetEnvMessagePort
IsMainThread: true | false
...
}
After understanding the export capabilities of the work_threads module, we look at the logic of new Worker. According to the logic derived from the above code, we know that a new C++ object will be created first at this time. Corresponds to the this in the Worker function above. Then execute the New callback and pass in the tihs. Let's look at the logic of the New function. We omit a series of parameter handling, the main code is as follows.
/ / args.This () is the this we just passed in.
Worker* worker = new Worker (env, args.This ()
Url, per_isolate_opts
Std::move (exec_argv_out))
Let's take a look at the Worker class.
Worker::Worker (Environment* env
Local wrap,...)
/ / complete the association of the object's Worker object and args.This () object in the parent class constructor
: AsyncWrap (env, wrap, AsyncWrap::PROVIDER_WORKER)
...
/ / assign thread id
Thread_id_ (Environment::AllocateThreadId ())
Env_vars_ (env- > env_vars ()) {
/ / create a new port to communicate with child threads
Parent_port_ = MessagePort::New (env, env- > context ())
/ *
To associate with, for communication.
Const parent_port_ = {data: {sibling: null}}
Const child_port_data_ = {sibling: null}
Parent_port_.data.sibling = child_port_data_
Child_port_data_.sibling = parent_port_.data
, /
Child_port_data_ = std::make_unique (nullptr)
MessagePort::Entangle (parent_port_, child_port_data_.get ())
/ / set the messagePort property of the Worker object to parent_port_
Object ()-> Set (env- > context ()
Env- > message_port_string ()
Parent_port_- > object (). Check ()
/ / set the thread id of the Worker object, that is, the threadId property
Object ()-> Set (env- > context ()
Env- > thread_id_string ()
Number::New (env- > isolate (), static_cast (thread_id_)
.Check ()
}
Create a new Worker with the following structure
Now that we understand the logic of new Worker, we look at how it is used in the js layer. Let's look at the constructor of the Worker class in the js layer. Constructor (filename, options = {}) {
Super ()
/ / ignore a series of parameter handling. New Worker is the one of C++ layer mentioned above.
This [kHandle] = new Worker (url, options.execArgv, parseResourceLimits (options.resourceLimits))
/ / messagePort is the messagePort in the figure above, pointing to _ parent_port
This [kPort] = This [kHandle] .messagePort
This [KPort] .on ('message', (data) = > this [kOnMessage] (data))
/ / start to receive messages. We will not go deep into messagePort here. We will analyze them separately later.
This [kPort] .start ()
/ / apply for a communication channel with two ports
Const {port1, port2} = new MessageChannel ()
This [kPublicPort] = port1
Th [kPublicPort] .on ('message', (message) = > this.emit (' message', message))
/ / send a message to the other end
This [kPort] .postMessage ({
Argv
Type: messageTypes.LOAD_SCRIPT
Filename
DoEval:!! options.eval
CwdCounter: cwdCounter | | workerIo.sharedCwdCounter
WorkerData: options.workerData
PublicPort: port2
ManifestSrc: getOptionValue ('--experimental-policy')?
Require ('internal/process/policy') .src:
Null
HasStdin:!! options.stdin
}, [port2])
/ / start the thread
This [kHandle] .startThread ()
}
The main logic of the above code is as follows
1 save the messagePort, and then send the message to the peer of the messagePort (see the figure above), but there is no receiver at this time, so the message is cached in the MessagePortData, that is, child_port_data_.
2 apply for a communication pipeline for communication between main thread and child thread. _ parent_port and child_port are for nodejs, and the newly applied pipeline is for users.
3 create child threads.
We look at what we did when we created the thread.
Void Worker::StartThread (const FunctionCallbackInfo& args) {
Worker* w
/ / unpack the corresponding Worker object
ASSIGN_OR_RETURN_UNWRAP (& w, args.This ())
/ / create a new child thread, then execute the Run function, and then execute it in the child thread
Uv_thread_create_ex (& w-> tid_, & thread_options, [] (void* arg) {
W-> Run ()
}, static_cast (w))
}
Let's keep watching Run.
Void Worker::Run () {
{
/ / create a new env
Env_.reset (new Environment (data.isolate_data_.get))
Context
Std::move (argv_)
Std::move (exec_argv_)
Environment::kNoFlags
Thread_id_))
/ / initialize libuv and register with libuv
Env_- > InitializeLibuv (start_profiler_idle_notifier_)
/ / create a MessagePort
CreateEnvMessagePort (env_.get ())
/ / execute internal/main/worker_thread.js
StartExecution (env_.get (), "internal/main/worker_thread")
/ / start the event loop
Do {
Uv_run & data.loop_, UV_RUN_DEFAULT)
Platform_- > DrainTasks (isolate_)
More = uv_loop_alive (& data.loop_)
If (more & &! is_stopped ()) continue
More = uv_loop_alive (& data.loop_)
} while (more = = true & &! is_stopped ())
}
}
Let's analyze the above code step by step
1 CreateEnvMessagePort
Void Worker::CreateEnvMessagePort (Environment* env) {
Child_port_ = MessagePort::New (env
Env- > context ()
Std::move (child_port_data_))
If (child_port_! = nullptr)
Env- > set_message_port (child_port_- > object (isolate_))
}
The variable child_port_data_ should be familiar to us, so we should first apply for a new port here. The object responsible for data management in the port is child_port_data_. And then cached in env. I'll use it later.
2 execute internal/main/worker_thread.js// to set process object
PatchProcessObject ()
/ / get the port that was just cached
Onst port = getEnvMessagePort ()
Port.on ('message', (message) = > {
/ / load script
If (message.type = LOAD_SCRIPT) {
Const {
Argv
CwdCounter
Filename
DoEval
WorkerData
PublicPort
ManifestSrc
ManifestURL
HasStdin
} = message
Const CJSLoader = require ('internal/modules/cjs/loader')
LoadPreloadModules ()
/ *
The port at one end of the MessageChannel pipe requested by the main thread
Set the parentPort field of publicWorker. PublicWorker is the object exported by worker_threads. Later, you need to use
, /
PublicWorker.parentPort = publicPort
/ / data used during execution
PublicWorker.workerData = workerData
/ / notify the main thread that the script is being executed
Port.postMessage ({type: UP_AND_RUNNING})
/ / the file passed in when new Worker (filename) is executed
CJSLoader.Module.runMain (filename)
})
/ / start receiving messages
Port.start ()
At this point, let's look back at the scenario when we call new Worker (filename) and then execute our filename in the child thread. Let's review the previous code again.
Const {Worker, isMainThread, parentPort} = require ('worker_threads')
If (isMainThread) {
Const worker = new Worker (_ _ filename)
Worker.once ('message', (message) = > {
...
});
Worker.postMessage ('Hello, Worldwide')
} else {
/ / do something time-consuming
ParentPort.once ('message', (message) = > {
ParentPort.postMessage (message)
});
}
We know that isMainThread is false,parentPort or one end of messageChannel in a subthread. So when parentPort.postMessage sends a message to the peer, it sends a message to the main thread. Let's take a look at worker.postMessage ('Hello, worldview').
PostMessage (. Args) {
This [kPublicPort] .postMessage (... args)
}
KPublicPort points to the other end of the messageChannel. That is, a message is sent to the child thread. Then the on ('message') receives the message sent by the peer.
On how to use the nodejs source code analysis thread to share here, I hope the above content can be of some help to you, can learn more knowledge. If you think the article is good, you can share it for more people to see.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.