Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to realize Stream flow in Node.js

2025-03-31 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/02 Report--

This article mainly explains "how to realize Stream flow in Node.js". Interested friends may wish to have a look. The method introduced in this paper is simple, fast and practical. Let's let the editor take you to learn "how to realize Stream streaming in Node.js".

Introduction of Stream

Suppose we have a requirement that we need to copy the contents of one file to another, and we will write the following code

Const fs = require ('fs'); const path = require (' path'); const copy = (source, target) = > {fs.readFile (path.resolve (source), (err, data) = > {if (err) {throw new Error (err.toString ()); return } fs.writeFile (path.resolve (target), data, (err) = > {if (! err) {console.log ("copy successful!") ;}})})}

The above code is very simple, first read the contents of the source file, and then write the contents to the target file. It is characterized by the need to read all the contents of the source and then write the contents to the target.

This has a disadvantage, when we read a large file, there may be insufficient memory, because it will first read all the contents of the file into memory; in addition, time, read a large file to memory at a time, it takes a long time, the user may feel stuttered.

Another solution is to read and write while reading, reading part of the contents of the file, and then writing the contents to the new file, so that the data in memory is only part of the content and will not occupy too much memory, because it is written while reading. Users can get a quick response and improve the user experience.

Node.js provides us with Stream's API, which is designed to handle large files. Because data is part of the processing, just like the flow of water, the name of this module is called Stream.

Const fs = require ('fs'); function copy (source, target) {const rs = fs.createReadStream (source); const ws = fs.createWriteStream (target); rs.on (' data', data = > {ws.write (data);}); rs.on ('end', () = > {ws.end ();});}

The details of the above code will be revealed later.

Classification of Stream

Stream can be divided into four categories

Readable: readable stream, provider of data

Writeable: writable stream, consumer of data

Duplex: writable and readable stream (duplex flow)

Transform: in the special case of Duplex, the stream is converted, the input data is processed, and then output

Readable stream and writeable stream are the foundation. Common readable and writable streams are as follows

Readable stream writable stream HTTP RequestHTTP Reponsefs read streamsfs write streamsprocess.stdinprocess.stdoutTCP socketsTCP socketszlib streamszlib streamscrypto streamscrypto streams

Stream is an instance of EventEmitter with custom events.

Readable Stream

There are two modes of readable stream, pause mode and flow mode. When we create a stream, if we listen for the readable event, it will come to the pause mode. In the pause mode, it will constantly read the data into the buffer. When the read data exceeds the preset size, it is specified by the attribute highWaterMark (default is 64kB), and the readable event will be triggered. There are two ways to trigger the readable event:

The data in the cache reaches the preset size of highWaterMark

The data of the data source has been read.

Const fs = require ('fs'); const rs = fs.createReadStream (' a.txtstorage, {highWaterMark: 1 / / cache up to 1 byte}); rs.on ('readable', () = > {let data; while (data=rs.read ()) {console.log (data.toString ());}})

The above program sets highWaterMark to 1, that is, each time a byte is read, the readable command is triggered. Every time the readable command is triggered, we call the read ([size]) method of the readable stream to read the data from the buffer (the read data is Buffer) and then print it to the console.

When we bind the data event to the readable stream, the readable stream will switch to the flow state. When it is in the flow state, the readable stream will automatically read the contents from the file to the buffer. When the content in the buffer is larger than the set size of highWaterMark, it will trigger the data event and pass the data in the buffer to the function bound by the data event. The above process will be carried out automatically. The end event is triggered when everything in the file has been read.

Const fs = require ('fs'); const rs = fs.createReadStream (' a.txtreading, {highWaterMark: 2}); rs.on ('data', data = > {console.log (data.toString ());}); rs.on (' end', () = > {console.log ("File read complete!") ;})

The pause mode is like a manual rifle, while the flow mode is like an automatic rifle. Pause mode and flow mode can also be switched from flow state to pause state by pause () and from pause mode to flow mode by resume ().

A classic example of a readable stream is the request object req in http. The following program shows reading the contents of the HTTP request body by listening to the data event of req.

Const http = require ('http'); const app = http.createServer (); app.on (' request', (req, res) = > {let datas = []; req.on ('data', data = > {datas.push (data);}); req.on (' end', () = > {req.body = Buffer.concat (datas)) / after reading the content in body, return the content to the client res.end (req.body);}) app.listen (3000, () = > {console.log ("the service starts on port 3000.");})

Writable Stream

The writable stream is similar to the readable stream. When we write data to the writable stream (through the write () method of the writable stream), we will write the data directly to the file. If the data is written slowly, then the data will be written to the buffer. When the content in the buffer reaches the size set by highWaterMark, the write method will return a false, indicating that more data cannot be accepted.

When all the data in the buffer is consumed (written to a file or consumed by another stream), the drain event is triggered.

Const fs = require ('fs'); const ws = fs.createWriteStream (' b.txtache, {highWaterMark: 16 * 1024}); function writeMillionTimes (writer, data, encoding, callback) {let I = 10000; write (); function write () {/ / indicates whether data let ok = true can be written to the writable stream While (iMurt-> 0 & & ok) {/ / when the writer.write () method returns false to indicate that unwritable data ok = writer.write (data, encoding, I = 0? Callback: null);} if (I > 0) {/ / indicates that ok is false, that is, console.log ("drain", I) cannot be written to the buffer; / / listens for drain events, and continues to call the write () method to write writer.once ('drain', write) when the queue consumption is finished. } writeMillionTimes (ws, 'simple',' utf-8', () = > {console.log ("end");})

Output as

Drain 7268drain 4536drain 1804end

Indicates that the contents of the buffer have reached 16KB three times, and the difference between the above numbers can be checked. When multiplied by 6 (the number of bytes of simple), the size is about 16 * 1024, as shown in

(7268 − 4536) ∗ 616392 ≈ 1638416∗ 1024 (7268-4536) * 616392\ approx 1638416* 1024

We can also call the end () method of the writable stream, which means that the contents of the cache are cleared to write to the file and close the file, which triggers the close event

Const fs = require ('fs'); const ws = fs.createWriteStream (' b.txt'); ws.write ('Hello'); ws.write (' World'); ws.end ('!'); ws.on ('close', () = > {console.log ("close"); / / close})

When the end () method is called, the write () method cannot be called, otherwise an error will be reported.

Const fs = require ('fs'); const ws = fs.createWriteStream (' b.txt'); ws.write ('Hello'); ws.write (' World'); ws.end ('!'); ws.write ('write again'); / / Error [ERR_STREAM_WRITE_AFTER_END]: write after end

The finish event of the writable stream is triggered after the end () method is called and the data in the data buffer has been written

Const fs = require ('fs'); const ws = fs.createWriteStream (' b.txt'); ws.write ('Hello'); ws.write (' World'); ws.end ('!'); ws.on ('close', () = > {console.log ("close");}); ws.on (' finish', () = > {console.log ("finish");})

The result of the print is

Finishclose

Indicates that the finish event will be triggered before the close event.

The classic example of a writable stream is the response object res of the http module. The following program demonstrates that when a request arrives, we read a html page and return it to the client.

Const http = require ('http'); const fs = require (' fs'); const app = http.createServer (); app.on ('request', (req, res) = > {const rs = fs.createReadStream (' index.html'); rs.on ('data', data = > {res.write (data);}) rs.on (' end', () = > {res.end ()});}) App.listen (3000, () = > {console.log ("Service starts on port 3000...");}) Duplex Stream and Transform Stream

Duplex, which means duplex, can not only receive data, but also output data, and there can be no relationship between its input and output, just as there are two independent systems inside a component. Duplex inherits readable streams (Readable) and has all the methods of writable streams (Writable).

Transform Stream inherits Duplex Stream, it also has the ability of readable stream and writeable stream, and there is a relationship between its output and input. A common transformation flow is zlib,crypto.

For the sake of the structure of the article, we will not explain these two flows in detail here, but we will implement them later in order to deepen our understanding of them.

Pipe

We can use a mixture of readable and writable streams to copy files.

Const fs = require ('fs'); function copy (source, target) {const rs = fs.createReadStream (source); const ws = fs.createWriteStream (target); rs.on (' data', data = > {ws.write (data);}); rs.on ('end', () = > {ws.end ();});} copy (' a.txtforth, 'b.txt')

However, the above method of writing is not recommended because the difference between the speed of the readable stream and the writable stream is not taken into account. If the output speed of the readable stream is faster than that of the writable stream, at this time, data will be piled up in the cache all the time, resulting in excessive memory consumption, which is technically called backlog.

We need to improve the above program. When the write () method returns false, we switch the readable stream mode to pause mode. When the writable stream triggers the drain event, we switch the readable stream state to flow mode.

Const fs = require ('fs'); function copy (source, target) {const rs = fs.createReadStream (source); const ws = fs.createWriteStream (target); rs.on (' data', data = > {if (! ws.write (data)) {rs.pause ();}}); rs.on ('end', () = > {ws.end ();}) Ws.on ('drain', () = > {rs.resume ();})}

Does that mean we need to write so much code every time we use streams? of course not. Officially, a pipe (ws) method is provided for the readable stream, and the pipe method receives a writable stream. Its function is to write the data from the readable stream to the writable stream, and there is a speed difference processing inside it. So the above can be changed to the following version.

Const fs = require ('fs'); function copy (source, target) {const rs = fs.createReadStream (source); const ws = fs.createWriteStream (target); rs.pipe (ws);}

When we call the pipe method, the pipe event of the writable stream is triggered. The implementation of pipe is referred to as follows

Readable.prototype.pipe = function (ws) {this.on ('data', data = > {if (! ws.write (data)) {this.pause ();}}); ws.on (' drain', () = > {this.resume ();}); / / trigger pipe event ws.emit ('pipe', this) / / returns a writable stream to support chained calls to return ws;}

Here is a flow chart of pipe drawn on the official website.

+ = + xmuri-> Piping functions +-> src.pipe (dest) | x are set up during | = | x the .pipe method. | | Event callbacks | + = + x |-| | Your Data | x They exist outside | .on ('close', cb) | + = = + x the data flow, but | .on (' data', cb) | | x importantly attach | .on ('drain') | Cb) | | x events, and their | .on ('unpipe', cb) | +-VMI + x respective callbacks. | | .on ('error', cb) | | Readable Stream +-+ | .on (' finish', cb) | +-^-+ | | .on ('end') | Cb) | ^ | ^ | +-+ | | ^ | ^ | +-+ = + ^ | ^ +-- > Writable Stream +-> .write (chunk) | | +-+ = + = + | | ^ | +-if + ^ | +-> if (! chunk) | Is this chunk too big? | ^ | | emit .end () | | Is the queue busy? | | +-> else +-- + | ^ | emit .write () | | ^ +-- vmurmuri + | | ^-vmurmuri + | | ^--

< No | | Yes | ^ | +------+ +---v---+ ^ | | | ^ emit .pause(); +=================+ | | ^---------------^-----------------------+ return false; emit .drain(); | ^Buffer^ | | +>

Emit .resume () +-+ | | ^ Buffer^ | | +-+ add chunk to queue | | | {console.log (data.toString ()) | }); rs.on ('end', () = > {console.log ("iteration ends");})

Output as

01234 end of iteration to implement writable stream

The process of implementing a writable stream is similar to that of implementing a readable stream, first inheriting the Writable class, and then implementing the _ write method.

Const fs = require ('fs'); const {Writable} = require (' stream'); class FileWritableStream extends Writable {constructor (filepath) {super (); this.filepath = filepath;} _ write (chunk, encoding, callback) {fs.appendFile (this.filepath, chunk, {encoding}, callback)}}

Above we implemented a writable stream, which takes a file path as a parameter, and its function is to append data to the file. Every time we call the write () method of the writable stream, it writes data to the buffer. When the threshold is reached, the _ write () method is called to add the data to the file.

Process.stdin.pipe (new FileWritableStream ('c.txt'))

The purpose of the above line of code is to output characters from standard input to c.txt.

Implement duplex flow

Duplex Stream can be used as either a readable stream or a writable stream, and there can be no relationship between its input and output. Duplex Stream inherits Readable and owns all of Writable. We just need to implement the _ read () and _ write () methods respectively.

Const {Duplex} = require ('stream'); class CustomDuplexStream extends Duplex {constructor () {super (); this.currentCharCode = 65;} _ read () {if (this.currentCharCode)

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report