In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-05 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)05/31 Report--
This article mainly explains "how to understand the flow in Nodejs". The content in the article is simple and clear, and it is easy to learn and understand. Please follow the editor's train of thought to study and learn "how to understand the flow in Nodejs".
How to understand flow
For the users of the stream, we can think of the stream as an array, and we only need to focus on getting (consuming) and writing (producing) from it.
For the developer of the flow (using the stream module to create a new instance), the focus is on how to implement some methods in the flow, usually focusing on two points: who the target resource is and how to operate the target resource. Once determined, the target resource needs to be manipulated according to the different states and events of the flow.
Cache pool
All streams in NodeJs have buffer pools. The purpose of buffer pools is to increase the efficiency of flows. When time is needed for data production and consumption, we can store production data in the buffer pool ahead of the next consumption. However, the buffer pool is not always in use, for example, when the cache pool is empty, the data will not be put into the cache pool after production but will be consumed directly. .
If the speed of data production is faster than the speed of data consumption, the excess data will be waiting somewhere. If the production speed of the data is less than the consumption rate of the process data, then the data will accumulate to a certain amount somewhere and then be consumed. (developers cannot control the speed of data production and consumption, so they can only produce data or consumption data as soon as possible.)
Where the data waits, accumulates the data, and then happens out. It's the buffer pool. The buffer pool is usually located in the RAM (memory) of the computer.
To take a common example of a buffer, when we watch an online video, if your network speed is very fast, the buffer will always be filled immediately, then sent to the system to play, and then immediately buffered the next video. In the process of watching, there will be no stutter. If the network speed is very slow, you will see loading, indicating that the buffer is being filled, and the video can only be seen when the data is sent to the system after filling.
The cache pool for NodeJs streams is a Buffer linked list, and each time you want to add data to the cache pool, a Buffer node is re-created and inserted at the end of the list.
EventEmitter
Stream in NodeJs is an abstract interface that implements EventEmitter, so I'll briefly introduce EventEmitter first.
EventEmitter is a class that implements the function of event publish and subscribe, of which several commonly used methods (on, once, off, emit) are familiar to everyone, so they will not be introduced one by one.
Const {EventEmitter} = require ('events') const eventEmitter = new EventEmitter () / for the eventA event binding handler eventEmitter.on (' eventA', () = > {console.log ('eventA active 1');}); / / for the eventB event binding handler eventEmitter.on (' eventB', () = > {console.log ('eventB active 1');}); eventEmitter.once (' eventA', () = > {console.log ('eventA active 2');}) / / trigger eventAeventEmitter.emit ('eventA') / / eventA active 1 / eventA active 2
It is worth noting that EventEmitter has two events called newListener and removeListener. When you add any event listener function to an event object, newListener (eventEmitter.emit ('newListener')) is triggered, and removeListener is triggered when a handler function is removed.
It is also important to note that the once-bound handler will only be executed once, and the removeListener will be triggered before it is executed, which means that the once-bound listener function is removed before it is triggered.
Const {EventEmitter} = require ('events') const eventEmitter = new EventEmitter () eventEmitter.on (' newListener', (event, listener) = > {console.log ('newListener', event, listener)}) eventEmitter.on (' removeListener', (event, listener) = > {console.log ('removeListener', event, listener)}) / / newListener removeListener [function (anonymous)] eventEmitter.on (' eventA', () = > {console.log ('eventA active 1');}) / / newListener eventA [Function (anonymous)] function listenerB () {console.log ('eventB active 1');} eventEmitter.on (' eventB', listenerB); / / newListener eventB [Function (anonymous)] eventEmitter.once ('eventA', () = > {console.log (' eventA active 2');}) / / newListener eventA [Function (anonymous)] eventEmitter.emit ('eventA') / / eventA active 1 Function / removeListener eventA [Function: bound onceWrapper] {listener: [Function (anonymous)]} / / eventA active 2eventEmitter.off (' eventB', listenerB) / / removeListener eventB [Function: listenerB]
But this is not important for the rest of our content.
Stream
Stream is an abstract interface for handling stream data in Node.js. Stream is not an actual interface, but a general term for all streams. The actual interfaces are ReadableStream, WritableStream and ReadWriteStream.
Interface ReadableStream extends EventEmitter {readable: boolean; read (size?: number): string | Buffer; setEncoding (encoding: BufferEncoding): this; pause (): this; resume (): this; isPaused (): boolean; pipe (destination: t, options?: {end?: boolean | undefined;}): t; unpipe (destination?: WritableStream): this; unshift (chunk: string | Uint8Array, encoding?: BufferEncoding): void; wrap (oldStream: ReadableStream): this [Symbol.asyncIterator] (): AsyncIterableIterator;} interface WritableStream extends EventEmitter {writable: boolean; write (buffer: Uint8Array | string, cb?: (err?: Error | null) = > void): boolean; write (str: string, encoding?: BufferEncoding, cb?: (err?: Error | null) = > void): boolean; end (cb?: () = > void): this; end (data: string | Uint8Array, cb?: () = > void): this End (str: string, encoding?: BufferEncoding, cb?: () = > void): this;} interface ReadWriteStream extends ReadableStream, WritableStream {}
You can see that both ReadableStream and WritableStream inherit the interface of the EventEmitter class (interfaces in ts can inherit classes because they are just merging types).
The implementation classes corresponding to the above interfaces are Readable, Writable and Duplex, respectively.
There are four types of streams in NodeJs:
Readable readable stream (implementing ReadableStream)
Writable writable stream (implement WritableStream)
Duplex readable and writeable stream (implement WritableStream after inheriting Readable)
Transform transformation stream (inheriting Duplex)
Back pressure problem
The speed at which the disk writes data is much lower than that of the memory. We imagine that there is a "pipe" between the memory and the disk. The "pipe" is a "stream". The data in the memory flows into the pipe very quickly. When the pipe is full, there will be a backlog of data in memory, and the data will backlog in memory and take up resources.
The solution of NodeJs Stream is to set a buoy value for each stream's cache pool (that is, the write queue in the figure). When the amount of data reaches this buoy value, false will be returned when push data to the cache pool again, indicating that the contents of the cache pool in the current stream have reached the buoy value and do not want any more data to be written. In this case, we should immediately stop data production to prevent the cache pool from causing backpressure.
Readable
Readable stream (Readable) is a type of stream. It has two modes and three states.
Two read modes:
Flow mode: the data is read and written from the underlying system to the buffer. When the buffer is full, the data is automatically passed to the registered event handler through EventEmitter as soon as possible.
Pause mode: in this mode, EventEmitter will not actively trigger the transfer of data, must be displayed to call the Readable.read () method to read data from the buffer, read will trigger a response to the EventEmitter event.
Three states:
ReadableFlowing = null (initial state)
ReadableFlowing = false (pause mode)
ReadableFlowing = true (flow mode)
The readable.readableFlowing of the initial stream is null
Becomes true after adding the data event. If you call pause (), unpipe (), or receive a backpressure or add a readable event, the readableFlowing will be set to false, and in this state, binding a listener to the data event will not cause readableFlowing to switch to true.
Call resume () to switch the readableFlowing of the readable stream to true
Removing all readable events is the only way to make readableFlowing null.
The event name indicates that readable is triggered when there is new readable data in the buffer (every insert node of the cache pool will trigger) data will be triggered after each consumption of data. The parameter is that when the close stream of the consumed data is closed, if an error occurs in the error stream of the error stream, the method name indicates that read (size) consumes data of length size. If null is returned, the current data is insufficient size, otherwise the data of this consumption is returned. If size is not passed, it means that all data in the cache pool is const fs = require ('fs'). Const readStreams = fs.createReadStream ('. / EventEmitter.js', {highWaterMark: 100 data / cache pool buoy}) readStreams.on ('readable', () = > {console.log (' buffer full') readStreams.read () / consume all data from the cache pool, return the result and trigger the data event}) readStreams.on ('data', (data) = > {console.log (' cache)})
Https://github1s.com/nodejs/node/blob/v16.14.0/lib/internal/streams/readable.js#L527
The readable event is triggered when size is 0.
When the length of the data in the cache pool reaches the buoy value highWaterMark, the production data will not be actively requested, but will wait for the data to be consumed.
If the suspended flow does not call read to consume data, data and readable will not be triggered later. When read consumption is called, it will first determine whether the remaining data length after this consumption is lower than the buoy value. If it is lower than the buoy value, the production data will be requested before consumption. In this way, there is a high probability that the new data has been produced after the completion of the logic execution after the read, and then the readable will be triggered again. This mechanism of producing the next consumption in advance and storing the data in the cache pool is also the reason why the cache stream is fast.
There are two cases of flow in the flow state.
When the production speed is slower than the consumption rate: in this case, there is no remaining data in the cache pool after each production data, so you can directly transfer the production data to the data event (since you do not enter the cache pool, so you do not need to call read for consumption), and then start producing new data immediately. After the last data consumption, the new data will be produced, and data will be triggered again, one until the end of the stream.
When the production speed is faster than the consumption speed: at this time, there is still unconsumed data in the cache pool after each production. In this case, the data for the next consumption will be produced when the data is consumed. After the consumption of the old data, the new data has been produced and put into the cache pool.
The only difference between them is whether there is still data in the cache pool after data production. If there is data, the produced data will be push to the cache pool for consumption. If it does not exist, the data will be directly handed over to data without joining the cache pool.
It is worth noting that when a stream of data in a cache pool enters the flow mode from the pause mode, the read will be called in a loop to consume the data only until the null is returned.
Pause mode
In pause mode, when a readable stream is created, the mode is paused. After creation, the _ read method is automatically called to push the data from the data source to the buffer pool until the data in the buffer pool reaches the buoy value. Whenever the data reaches the buoy value, the readable stream triggers a "readable" event that tells the consumer that the data is ready and can continue to consume.
In general, the 'readable' event indicates a new dynamic in the flow: either there is new data or it reaches the end of the flow. Therefore, before the data of the data source is read, a 'readable' event is also triggered.
The data in the buffer pool is actively consumed through stream.read (size) in the handler function of the consumer "readable" event.
Const {Readable} = require ('stream') let count = 1000const myReadable = new Readable ({highWaterMark: 300, / / the read method of the parameter is used as the _ read method of the stream to get the source data read (size) {/ / assume that the process of reading data from 1000 1 let chunk = null / / on our source data is generally asynchronous For example, IO operation setTimeout (() = > {if (count > 0) {let chunkLength = Math.min (count, size) chunk = '1'.repeat (chunkLength) count-= chunkLength} this.push (chunk)} ReadablemyReadable.on ('readable', () = > {const chunk = myReadable.read () / consume all data in the current cache pool console.log (chunk.toString ()
It is worth noting that if the size of read (size) is greater than the buoy value, the new buoy value will be recalculated. The new buoy value is the next second power of size (size = MAX_HWM) {/ / 1GB limit n = MAX_HWM;} else {/ / take the highest power of 2 to prevent excessive increase of hwm NLV; n | = n > 1; n | = n > 2; n | = n > 4 N | = n > 8; n | = n > 16; n flows;} return n;} flow mode
All readable streams start in pause mode and can be switched to flow mode in the following ways:
Add a "data" event handle
Call the "resume" method
Use the "pipe" method to send data to a writable stream
In flow mode, the data in the buffer pool is automatically output to the consumer for consumption. At the same time, after each output, the _ read method is automatically called back to put the data from the data source into the buffer pool. If there is no data in the cache pool, the data will be passed directly to the data event and will not go through the cache pool. Until the flow mode switches to another pause mode, or the data source is read out (push (null))
Readable streams can switch back to pause mode in the following ways:
If there is no pipe target, stream.pause () is called.
If there are pipe targets, remove all pipe targets. Multiple pipe targets can be removed by calling stream.unpipe ().
Const {Readable} = require ('stream') let count = 1000const myReadable = new Readable ({highWaterMark: 300, read (size) {let chunk = null setTimeout (() = > {if (count > 0) {let chunkLength = Math.min (count) Size) chunk = '1'.repeat (chunkLength) count-= chunkLength} this.push (chunk)}}) myReadable.on (' data', data = > {console.log (data.toString ())}) Writable
Writable streams are simpler than readable streams.
When the producer calls write (chunk), it will choose whether to cache it in the buffer queue or call _ write according to some status (corked,writing, etc.). Each time the data is written, it will try to empty the data in the cache queue. If the size of the data in the buffer queue exceeds the buoy value (highWaterMark), the consumer will return false after calling write (chunk), and the producer should stop writing.
So when can I continue to write? When all the data in the buffer is successfully _ write, the drain event is triggered after the buffer queue is cleared, and the producer can continue to write data.
When the producer needs to end writing data, the stream.end method needs to be called to notify the end of the writable stream.
Const {Writable, Duplex} = require ('stream') let fileContent =' 'const myWritable = new Writable ({highWaterMark: 10, write (chunk, encoding, callback) {/ / will be used as _ write method setTimeout (() = > {fileContent + = chunk callback () / / call after writing ends}, 500)}) myWritable.on (' close', () = > {console.log ('close') FileContent)}) myWritable.write ('123123') / / truemyWritable.write ('123123') / / falsemyWritable.end ()
Note that after the data in the cache pool reaches the buoy value, there may be multiple nodes in the cache pool. During the process of emptying the cache pool (loop call _ read), instead of consuming data with a length of buoy at a time as readable streams, it consumes one buffer node at a time, even if the buffer length is inconsistent with the buoy value.
Const {Writable} = require ('stream') let fileContent =' 'const myWritable = new Writable ({highWaterMark: 10, write (chunk, encoding, callback) {setTimeout (() = > {fileContent + = chunk console.log (' consumption', chunk.toString ()) callback () / / call after writing is finished) myWritable.on ('close' () = > {console.log ('close', fileContent)}) let count = 0function productionData () {let flag = true while (count 20) {myWritable.end ()} productionData () myWritable.on (' drain', productionData)
The above is a writable stream with a buoy value of 10, and now the data source is a string of numbers from 0 to 20, and productionData is used to write data.
When myWritable.write ("0") is called for the first time, because there is no data in the cache pool, "0" does not enter the cache pool, but is sent directly to _ wirte,myWritable.write ("0"). The return value is true.
When executing myWritable.write ("1"), because the callback of _ wirte has not been called, it indicates that the last data has not been written, and the location ensures the order of data writing, so a buffer can only be created to add "1" to the cache pool. The same is true of the latter 2-9.
When myWritable.write ("10") is executed, the buffer length is 9 (1-9), and the buoy value has not yet been reached. "10" continues to join the cache pool as a buffer, and the cache pool length becomes 11, so myWritable.write ("1") returns false, which means that there is enough data in the buffer, and we need to wait for the drain event notification to reproduce the data.
After 100ms, the callback of _ write ("0", encoding, callback) is called, indicating that "0" has been written. It then checks whether there is data in the cache pool, and if so, first calls the header node of the read consumption cache pool ("1"), and then repeats the process until the drain event is triggered after the cache pool is empty, executing the productionData again
Call myWritable.write ("11") to trigger the process that starts in step 1 until the end of the flow.
Duplex
After understanding the readable stream and the writeable stream, it is easy to understand that the duplex flow actually inherits the readable stream and implements the writable stream (the source code is written in this way, but it should be said that it is better to implement both the readable stream and the writable stream at the same time).
Duplex streams need to implement the following two methods at the same time
Implement the _ read () method to produce data for readable streams
Implement the _ write () method to consume data for writable streams
How to implement the above two methods has been described in the writable stream readable stream section above. It should be noted that there are two separate cache pools for the two streams, and their data sources are not the same.
Take the standard input and output stream of NodeJs as an example:
Its data event is triggered when we enter data on the console, which proves that it has the function of readable stream, and each time the user types enter is equivalent to calling the readable push method to push the production data.
When we call its write method, we can also output the content to the console, but the data event will not be triggered, which means that it has the function of writable stream and has an independent buffer. The implementation of the _ write method is to let the console display the text.
/ / whenever the user enters data (_ read) in the console, the data event is triggered, which is the readable stream feature process.stdin.on ('data', data= > {process.stdin.write (data)) }) / / produce data to the standard input stream every second (this is a writeable stream feature and will be output directly to the console) without triggering datasetInterval (() = > {process.stdin.write ('data not entered by the user console')}, 1000) Transform
You can think of a Duplex stream as a readable stream with a writable stream. Both are independent, and each has its own internal buffer. Read and write events occur independently.
Duplex Stream-| Read External Sink-|
Transform streams are duplex, where reading and writing are causal. The endpoint of a duplex flow is linked through some kind of transformation. Read requires a write to occur.
Transform Stream-|-You Write->-> Read You
For creating a Transform stream, the most important thing is to implement the _ transform method instead of _ write or _ read. The data written by the writable stream is processed (consumed) in _ transform and then produced for the readable stream.
The conversion stream often implements a `flush` method, which is called before the end of the stream and is usually used to append something to the end of the stream. For example, some compression information when compressing a file is const {write} = require ('fs') const {Transform, PassThrough} = require (' stream') const reurce = '1312123213124341234213423428354816273513461891468186499126412'const transform = new Transform ({highWaterMark: 10, transform (chunk, encoding, callback) {/ / convert data Call push to add the conversion result to the cache pool this.push (chunk.toString (). Replace ('1mm,' @') callback ()}, and execute this.push ('') function productionData () {let flag = true while (count 20) {transform.end ()} productionData () transform.on ('drain') before flush (callback) {/ / end triggers. ProductionData) let result =''transform.on (' data', data= > {result + = data.toString ()}) transform.on ('end', () = > {console.log (result) / / > > 0,2345678989)
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.