In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-18 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/03 Report--
Node.js in how to solve the "back pressure" problem, many novices are not very clear, in order to help you solve this problem, the following editor will explain for you in detail, people with this need can come to learn, I hope you can gain something.
What are the four stream of Node.js? What is the back pressure problem? How do I move something from A to B?
Lift it up, move it to your destination, and just put it down.
What if this thing weighs a ton?
Then move part of it.
In fact, IO is to move things, including the IO of the network, the IO of files, if the amount of data is small, then directly transfer all the content, but if there is a lot of content, one-time loading into memory will collapse, and the speed is slow, at this time can be part of the processing, this is the idea of flow.
Api,Node.js of stream is basically implemented in various languages, and stream api is more commonly used.
The intuitive feeling of flow
From one place to another, it is obvious that there are outflows and inflows, the outflow is the readable stream (readable), and the inflow side is the writable stream (writable).
Of course, there are also streams that can flow in and out. This is called duplex.
Since you can flow in and out, is it possible to convert the incoming content down and out again? this kind of flow is called transform.
The inflow and outflow of duplex flow do not need to be related, while the inflow and outflow of transform flow are related, which is the difference between the two.
Api of the stream
The stream provided by Node.js is the four types described above:
Const stream = require ('stream'); / / readable stream const Readable = stream.Readable;// writable stream const Writable = stream.Writable;// duplex flow const Duplex = stream.Duplex;// conversion stream const Transform = stream.Transform
They all have ways to implement:
Readable needs to implement the _ read method to return the content
Writable needs to implement the _ write method to accept the content
Duplex needs to implement _ read and _ write methods to accept and return content
Transform needs to implement the _ transform method to convert the accepted content and return it.
Let's take a look at it separately:
Readable
To implement the _ read method, Readable returns specific data through push.
Const Stream = require ('stream'); const readableStream = Stream.Readable (); readableStream._read = function () {this.push (' a vine in front of Amena,'); this.push ('the sprouting of Adong's green just sprouting,'); this.push ('carrying that heavy shell,'); this.push ('climb step by step.') This.push (null);} readableStream.on ('data', (data) = > {console.log (data.toString ())}); readableStream.on (' end', () = > {console.log ('done~');})
When push a null, it means to end the stream.
The execution results are as follows:
Readable can also be created by inheritance:
Const Stream = require ('stream'); class ReadableDong extends Stream.Readable {constructor () {super ();} _ read () {this.push (' a vine in front of Amena,'); this.push ('the sprouting of A Dong Dong green,'); this.push ('A Dong carries that heavy shell,'); this.push ('climb step by step.') This.push (null);}} const readableStream = new ReadableDong (); readableStream.on ('data', (data) = > {console.log (data.toString ())}); readableStream.on (' end', () = > {console.log ('done~');})
The readable stream generates content, so it can naturally be combined with the generator:
Const Stream = require ('stream'); class ReadableDong extends Stream.Readable {constructor (iterator) {super (); this.iterator = iterator;} _ read () {const next = this.iterator.next (); if (next.done) {return this.push (null) } else {this.push (next.value)}} function * songGenerator () {yield 'Amen's former vine,'; yield'A Dong's green just sprouted,'; yield'A Dong carried that heavy shell,'; yield 'climbed step by step.' ;} const songIterator = songGenerator (); const readableStream = new ReadableDong (songIterator); readableStream.on ('data', (data) = > {console.log (data.toString ())}); readableStream.on (' end', () = > {console.log ('done~');})
This is the readable stream, which returns the content by implementing the _ read method.
Writable
Writable implements the _ write method, which receives the written content.
Const Stream = require ('stream'); const writableStream = Stream.Writable (); writableStream._write = function (data, enc, next) {console.log (data.toString ()); / / write setTimeout (()) once per second (() = > {next ();}, 1000);} writableStream.on (' finish', () = > console.log ('done~')); writableStream.write (' Amena's former vine,') WritableStream.write ('A Dong Dong green just sprouted,'); writableStream.write ('A Dong carries that heavy shell,'); writableStream.write ('climb step by step.') ; writableStream.end ()
Receive the written content, print it out, and call next to process the next write, where the call to next is asynchronous and the frequency can be controlled.
After running for a while, it can really deal with the written content normally:
This is the writable stream, which handles the written content by implementing the _ write method.
Duplex
Duplex is readable and writable, so it is OK to implement _ read and _ write at the same time.
Const Stream = require ('stream'); var duplexStream = Stream.Duplex (); duplexStream._read = function () {this.push (' a vine in front of Amena,'); this.push ('the sprouting of Adong's green just sprouting,'); this.push ('carrying that heavy shell,'); this.push ('climb step by step.') This.push (null);} duplexStream._write = function (data, enc, next) {console.log (data.toString ()); next ();} duplexStream.on ('data', data = > console.log (data.toString (); duplexStream.on (' end', data = > console.log ('read done~')); duplexStream.write (' a vine in front of Amena,'); duplexStream.write ('fresh sprouting of Adong Adong green,') DuplexStream.write ('A Dong carries that heavy shell,'); duplexStream.write ('climb up step by step.') ; duplexStream.end (); duplexStream.on ('finish', data = > console.log (' write done~'))
It integrates the functions of Readable flow and Writable flow, which is called duplex flow Duplex.
Transform
Although the Duplex stream is readable and writable, there is no relationship between the two, and sometimes you need to convert the incoming content and then flow out, so you need to convert the stream Transform.
To implement the api of _ transform in the Transform stream, we implement the conversion flow that reverses the content:
Const Stream = require ('stream'); class TransformReverse extends Stream.Transform {constructor () {super ()} _ transform (buf, enc, next) {const res = buf.toString (). Split ('). Reverse (). Join (''); this.push (res) next ()}} var transformStream = new TransformReverse () TransformStream.on ('data', data = > console.log (data.toString () transformStream.on (' end', data = > console.log ('read done~')); transformStream.write (' A Vine in front of Amena'); transformStream.write ('A Dong's green just sprouted'); transformStream.write ('A Dong carries that heavy shell'); transformStream.write ('climb step by step') TransformStream.end () transformStream.on ('finish', data = > console.log (' write done~'))
After running for a while, the effect is as follows:
Pause and flow of flow
We get the content from the Readable stream and flow it into the Writable stream. The flow is achieved by implementing _ read and _ write on both sides.
Back pressure
But both read and write are asynchronous. What if the rates are not the same?
If the rate at which Readable reads data is greater than the rate at which Writable writes, some data will be accumulated in the buffer. If too much data is buffered, it will burst and data will be lost.
What if the rate at which Readable reads data is less than the rate at which Writable writes? It doesn't matter, at most there is a period of free time in the middle.
This phenomenon in which the reading rate is greater than the writing rate is called "back pressure", or "negative pressure". It is also easy to understand that the pressure of the write section is relatively high, so it can not be written in, which will burst the buffer and lead to data loss.
The buffer size, which can be viewed through readableHighWaterMark and writableHightWaterMark, is 16k.
Solve the back pressure
How to solve the problem of inconsistent reading and writing rate?
When you are not finished, just pause the reading. In this way, more and more data will not be read and reside in the buffer.
Readable stream has a readableFlowing attribute that indicates whether the data is automatically read, which defaults to true, that is, automatically read the data, and then listen to the data event to get it.
When readableFlowing is set to false, it will not be read automatically and needs to be manually read through read.
ReadableStream.readableFlowing = false;let data;while ((data = readableStream.read ())! = null) {console.log (data.toString ());}
However, it is troublesome to manually read ourselves. We can still use automatic inflow and call pause and resume to pause and resume.
When the write method of writable stream is called, a boolean value is returned indicating whether the target is written or placed in a buffer:
True: the data has been written to the destination
False: the destination is not writable and is temporarily placed in the buffer
We can determine that pause is returned when false is returned, and then resume when the buffer is empty:
Const rs = fs.createReadStream (src); const ws = fs.createWriteStream (dst); rs.on ('data', function (chunk) {if (ws.write (chunk) = false) {rs.pause ();}}); rs.on (' end', function () {ws.end ();}); ws.on ('drain', function () {rs.resume ();})
In this way, the function of pausing and resuming the read rate according to the writing rate can be achieved, and the problem of back pressure is solved.
Does pipe have a back pressure problem?
Usually, we often use pipe to directly connect the Readable stream to the Writable stream, but we don't seem to have encountered the problem of back pressure. In fact, the read rate has been dynamically adjusted within the pipe.
Const rs = fs.createReadStream (src); const ws = fs.createWriteStream (dst); rs.pipe (ws); Summary
Stream is a common idea when transmitting data, which is a part of the transmission content, and it is the basic concept of file reading and writing and network communication.
Node.js also provides api for stream, including Readable readable stream, Writable writable stream, Duplex duplex stream and Transform conversion stream. They implement the _ read, _ write, _ read + _ write, _ transform methods for data return and processing, respectively.
To create a Readable object, you can either call Readable api creation directly and then override the _ read method, or you can inherit a subclass from the Readable implementation and instantiate it later. Other streams are the same. (Readable can be easily combined with generator)
When the read rate is greater than the write rate, the phenomenon of "back pressure" will occur, which will burst the buffer and lead to data loss. the solution is to dynamic the rate of write and resume readable streams according to the rate of pause. Pipe does not have this problem because it is handled internally.
Flow is a concept that can not be bypassed by IO, and the problem of back pressure is also a common problem of flow. If you encounter data loss, you can consider whether there is back pressure. I hope this article can help you sort out your thoughts and really master stream!
!
Is it helpful for you to read the above content? If you want to know more about the relevant knowledge or read more related articles, please follow the industry information channel, thank you for your support.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.