In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-31 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/01 Report--
This article focuses on "what is the readable stream in Node.js". Friends who are interested may wish to take a look. The method introduced in this paper is simple, fast and practical. Let's let the editor take you to learn "what is the readable stream in Node.js"?
1. Basic concept
1.1. The historical evolution of flow
Streaming is not a unique concept of Nodejs. They were introduced in the Unix operating system decades ago, and programs can interact with each other through pipe operator (|) convection.
The pipe operator (|) can be used in both MacOS and Linux on Unix-based systems, which converts the output of the process on the left side of the operator into the input on the right.
In Node, if we use the traditional readFile to read the file, we will read the file into memory from beginning to end, and when all the contents have been read, the contents of the file loaded into memory will be processed uniformly.
There are two disadvantages to doing so:
Memory: takes up a lot of memory
Time: you need to wait for the entire payload of the data to be loaded before you start processing the data.
In order to solve the above problems, Node.js imitates and implements the concept of flow. In Node.js streams, there are four types of streams, all of which are instances of EventEmitter in Node.js:
Readable stream (Readable Stream)
Writable stream (Writable Stream)
Readable and writable full duplex flow (Duplex Stream)
Conversion stream (Transform Stream)
In order to further study this part of the content, step by step to understand the concept of stream in Node.js, and because the source code is more complex, I decided to start from the readable stream to learn this part of the content.
1.2. What is Stream
A stream is an abstract data structure that is a collection of data in which only the following data types can be stored (only in the case of objectMode = = false):
String
Buffer
We can think of the flow as a collection of these data, just like liquids, we first store the liquid in a container (the internal buffer BufferList of the flow), and then when the corresponding event is triggered, we pour the liquid into the pipe and tell others to pick up their own container on the other side of the pipe to pick up the liquid inside for treatment.
1.3. What is a readable stream (Readable Stream)
Readable stream is a type of stream. It has two modes and three states.
Two read modes:
Flow mode: the data is read from the underlying system and passed to the registered event handler through EventEmitter as soon as possible
Pause mode: data will not be read in this mode, and must be displayed by calling the Stream.read () method to read the data from the stream
Three states:
ReadableFlowing = = null: no data will be generated. Calling Stream.pipe () and Stream.resume will change its state to true, and start to generate data and trigger events actively.
ReadableFlowing = = false: data flow is paused at this time, but data generation is not paused, resulting in a data backlog
ReadableFlowing = = true: normal generation and consumption of data
two。 Basic principles
2.1. Internal state definition (ReadableState)
ReadableState
_ readableState: ReadableState {objectMode: false, / / other types of data except string, Buffer, null need to open this mode to highWaterMark: 16384, / / Water limit, 1024\ * 16, default 16kb. If this limit is exceeded, the buffer: BufferList {head: null, tail: null, length: 0}, / / Buffer linked list will be stopped calling\ _ read () to read the data into the buffer. Used to save data length: 0, / / the size of the entire readable stream data If it is objectMode, it is equal to buffer.length pipes: [], / save all pipeline queues flowing: null, / / null, false, true ended: false, / / all data consumption is finished endEmitted: false, / / end event whether reading: false has been sent, / / whether the data is being read constructed: true, / / before the stream is constructed or failed. Cannot be destroyed sync: true, / / whether to trigger the 'readable'/'data' event synchronously Or wait until the next tick needReadable: false, / / whether you need to send the readable event emittedReadable: false, / / readable event readableListening: false, / / whether there is a readable listening event resumeScheduled: false, / / whether the resume method errorEmitted: false has been called, / / whether the error event has been sent emitClose: true, / / when the stream is destroyed, whether to send the close event autoDestroy: true, / / automatically destroy Destroyed: false is called after the 'end' event is triggered, / / whether the stream has been destroyed errored: null, / / identifies whether the stream has reported an error closed: false, / / whether the stream has been closed closeEmitted: false, / / close event has been sent defaultEncoding:' utf8', / / default character encoding format awaitDrainWriters: null, / / points to the writer reference that listens for the 'drain' event Type: null, Writable, Set multiAwaitDrain: false, / / whether there are multiple writer waiting for the drain event readingMore: false, / / whether more data can be read dataEmitted: false, / / data sent decoder: null, / / Decoder encoding: null, / / Encoder [Symbol (kPaused)]: null}
2.2. Internal data storage implementation (BufferList)
BufferList is a container for streams to hold internal data. It is designed as a linked list with three properties, head, tail, and length.
I represent each node in BufferList as BufferNode, and the type of Data in it depends on objectMode.
This data structure fetches data from the header faster than Array.prototype.shift ().
2.2.1. Data storage type
If objectMode = true:
Then data can be of any type, and the data stored in push is what it is.
ObjectMode=true
Const Stream = require ('stream'); const readableStream = new Stream.Readable ({objectMode: true, read () {},}); readableStream.push ({name:' lisa'}); console.log (readableStream._readableState.buffer.tail); readableStream.push (true); console.log (readableStream._readableState.buffer.tail); readableStream.push ('lisa'); console.log (readableStream._readableState.buffer.tail); readableStream.push (666); console.log (readableStream._readableState.buffer.tail) ReadableStream.push (() = > {}); console.log (readableStream._readableState.buffer.tail); readableStream.push (Symbol (1)); console.log (readableStream._readableState.buffer.tail); readableStream.push (BigInt (123)); console.log (readableStream._readableState.buffer.tail)
Running result:
If objectMode = false:
Then data can only be string or Buffer or Uint8Array.
ObjectMode=false
Const Stream = require ('stream'); const readableStream = new Stream.Readable ({objectMode: false, read () {},}); readableStream.push ({name:' lisa'})
Running result:
2.2.2. Data storage structure
We create a readable stream on the console through the node command line to observe the changes in the data in buffer:
Of course, we need to implement his _ read method before push data, or implement the read method in the parameters of the constructor:
Const Stream = require ('stream'); const readableStream = new Stream.Readable (); RS._read = function (size) {}
Or
Const Stream = require ('stream'); const readableStream = new Stream.Readable ({read (size) {}})
After the readableStream.push ('abc') operation, the current buffer is:
You can see that the current data is stored. The data stored at the beginning and end is the ascii code of the string 'abc', and the type is Buffer. The length represents the number of pieces of data currently saved rather than the size of the data content.
2.2.3. Related API
Print all the methods of BufferList to get:
Except that join serializes BufferList into strings, everything else is an access operation to data.
Instead of going through all the methods here, I'll focus on consume, _ getString, and _ getBuffer.
2.2.3.1. Consume
Source code address: BufferList.consume https://github.com/nodejs/node/blob/d5e94fa7121c9d424588f0e1a388f8c72c784622/lib/internal/streams/buffer_list.js#L80
Comsume
/ / Consumes a specified amount of bytes or characters from the buffered data.consume (n, hasStrings) {const data = this.head.data; if (n)
< data.length) { // `slice` is the same for buffers and strings. const slice = data.slice(0, n); this.head.data = data.slice(n); return slice; } if (n === data.length) { // First chunk is a perfect match. return this.shift(); } // Result spans more than one buffer. return hasStrings ? this.\_getString(n) : this.\_getBuffer(n);} 代码一共有三个判断条件: 如果所消耗的数据的字节长度小于链表头节点存储数据的长度,则将头节点的数据取前n字节,并把当前头节点的数据设置为切片之后的数据 如果所消耗的数据恰好等于链表头节点所存储的数据的长度,则直接返回当前头节点的数据 如果所消耗的数据的长度大于链表头节点的长度,那么会根据传入的第二个参数进行最后一次判断,判断当前的BufferList底层存储的是string还是Buffer 2.2.3.2. _getBuffer 源码地址:BufferList._getBufferhttps://github.com/nodejs/node/blob/d5e94fa7121c9d424588f0e1a388f8c72c784622/lib/internal/streams/buffer_list.js#L137 comsume // Consumes a specified amount of bytes from the buffered data._getBuffer(n) { const ret = Buffer.allocUnsafe(n); const retLen = n; let p = this.head; let c = 0; do { const buf = p.data; if (n >Buf.length) {TypedArrayPrototypeSet (ret, buf, retLen-n); n-= buf.length;} else {if (n = buf.length) {TypedArrayPrototypeSet (ret, buf, retLen-n); + c; if (p.next) this.head = p.next; else this.head = this.tail = null } else {TypedArrayPrototypeSet (ret, new Uint8Array (buf.buffer, buf.byteOffset, n), retLen-n); this.head = p; p.data = buf.slice (n);} break;} + + c;} while ((p = p.next)! = = null); this.length-= c; return ret }
In general, it is a loop to operate on the nodes in the linked list, creating a new Buffer array to store the returned data.
First of all, the data is fetched from the header node of the linked list and copied to the new Buffer until the data of a node is greater than or equal to the length to be fetched minus the length already obtained.
Or after reading the last node of the linked list, it has not reached the length to be fetched, so return the newly created Buffer.
2.2.3.3. _ getString
Source code address: BufferList._getString https://github.com/nodejs/node/blob/d5e94fa7121c9d424588f0e1a388f8c72c784622/lib/internal/streams/buffer_list.js#L106
Comsume
/ / Consumes a specified amount of characters from the buffered data._getString (n) {let ret =''; let p = this.head; let c = 0; do {const str = p.data; if (n > str.length) {ret + = str; n-= str.length;} else {if (n = str.length) {ret + = str; + + c; if (p.next) this.head = p.next Else this.head = this.tail = null;} else {ret + = StringPrototypeSlice (str, 0, n); this.head = p; p.data = StringPrototypeSlice (str, n);} break;} + + c;} while ((p = p.next)! = = null); this.length-= c; return ret;}
The operation string is the same as the operation Buffer, and the loop reads the data from the head of the linked list, except that there are some differences in copying and storing the data, and the data type returned by the _ getString operation is of string type.
2.3. Why is a readable stream an instance of EventEmitter?
For this problem, the first step is to understand what the publish-subscribe model is. Publish-subscribe model has important applications in most API. Whether it is Promise or Redux, advanced API based on publish-subscribe model can be seen everywhere.
Its advantage is that it can store the relevant callback function of the event in the queue, and then notify the other party to deal with the data at some point in the future, so as to achieve the separation of concerns. The producer only cares about the production data and notifies the consumer, while the consumer only deals with the corresponding event and the corresponding data, and the Node.js flow pattern coincides with this characteristic.
So how does the Node.js stream create instances based on EventEmitter?
This part of the source code is here: stream/legacy https://github.com/nodejs/node/blob/d5e94fa7121c9d424588f0e1a388f8c72c784622/lib/internal/streams/legacy.js#L10
Legacy
Function Stream (opts) {EE.call (this, opts);} ObjectSetPrototypeOf (Stream.prototype, EE.prototype); ObjectSetPrototypeOf (Stream, EE)
Then there are several lines of code in the source code of the readable stream:
This part of the source code is here: readable https://github.com/nodejs/node/blob/d5e94fa7121c9d424588f0e1a388f8c72c784622/lib/internal/streams/readable.js#L77
Legacy
ObjectSetPrototypeOf (Readable.prototype, Stream.prototype); ObjectSetPrototypeOf (Readable, Stream)
First inherit the prototype object of Stream from EventEmitter so that all instances of Stream can access methods on EventEmitter.
At the same time, the static method on EventEmitter is also inherited through ObjectSetPrototypeOf (Stream, EE), and in the constructor of Stream, the constructor EE is used to inherit all the attributes in EventEmitter, and then in the readable stream, the prototype inheritance and static property inheritance of Stream class are implemented in the same way, thus getting:
Readable.prototype.__proto__ = Stream.prototype
Stream.prototype.__proto__ = EE.prototype
Therefore:
Readable.prototype.__proto__.__proto__ = EE.prototype
So the prototype of EventEmitter can be found by stroking the prototype chain of readable stream to realize the inheritance of EventEmitter.
2.4. Implementation of related API
This will be shown in the order in which API appears in the source document, and only the core API implementation is interpreted.
Note: only the functions declared in the Node.js readable stream source code are interpreted here, and the externally introduced function definitions are not included, and in order to reduce the space, all the code will not be copied.
Readable.prototype
Stream {destroy: [Function: destroy], _ undestroy: [Function: undestroy], _ destroy: [Function (anonymous)], push: [Function (anonymous)], unshift: [Function (anonymous)], isPaused: [Function (anonymous)], setEncoding: [Function (anonymous)], read: [Function (anonymous)], _ read: [Function (anonymous)], pipe: [Function (anonymous)], unpipe: [Function (anonymous)] On: [Function (anonymous)], addListener: [Function (anonymous)], removeListener: [Function (anonymous)], off: [Function (anonymous)], removeAllListeners: [Function (anonymous)], resume: [Function (anonymous)], pause: [Function (anonymous)], wrap: [Function (anonymous)], iterator: [Function (anonymous)], [Symbol (nodejs.rejection)]: [Function (anonymous)] [Symbol (Symbol.asyncIterator)]: [Function (anonymous)]}
2.4.1. Push
Readable.push
Readable.prototype.push = function (chunk, encoding) {return readableAddChunk (this, chunk, encoding, false);}
The main purpose of the push method is to pass the data block to the downstream pipe by triggering the 'data' event, or to store the data in its own buffer.
The following code is related pseudo code, showing only the main process:
Readable.push
Function readableAddChunk (stream, chunk, encoding, addToFront) {const state = stream.\ _ readableState; if (chunk = null) {/ / push null stream end signal, after which data state.reading = false; onEofChunk (stream, state) cannot be written;} else if (! state.objectMode) {/ / if it is not object mode if (typeof chunk = = 'string') {chunk = Buffer.from (chunk) } else if (chunk instanceof Buffer) {/ / if it is Buffer / / process the code} else if (Stream.\ _ isUint8Array (chunk)) {chunk = Stream.\ _ uint8ArrayToBuffer (chunk);} else if (chunk! = null) {err = new ERR\ _ INVALID\ _ ARG\ _ TYPE ('chunk', [' string', 'Buffer',' Uint8Array'], chunk) }} if (state.objectMode | | (chunk & & chunk.length > 0)) {/ / is the object mode or chunk is Buffer / / the judgment addChunk (stream, state, chunk, true) omitting several data insertion methods here. }} function addChunk (stream, state, chunk, addToFront) {if (state.flowing & & state.length = 0 & & state.sync & & stream.listenerCount ('data') > 0) {/ / if in flow mode, there is a subscriber stream.emit (' data', chunk) listening on data;} else {/ / otherwise save data to buffer state.length + = state.objectMode? 1: chunk.length If (addToFront) {state.buffer.unshift (chunk);} else {state.buffer.push (chunk);}} maybeReadMore (stream, state); / / try to read more data}
Push operations are mainly divided into the judgment of objectMode. Different types will do different operations on the incoming data:
ObjectMode = = false: convert data (chunk) to Buffer
ObjectMode = = true: pass the data to the downstream intact
The first judgment of addChunk is to deal with situations when Readable is in flow mode, has a data listener, and the buffer data is empty.
At this point, the main thing is to pass the data passthrough through to other programs that subscribe to data events, otherwise the data will be saved in the buffer.
2.4.2. Read
Except for the judgment of boundary condition and flow state, this method has two main operations.
Call the _ read method implemented by the user to process the execution result
Reads data from the buffer buffer and triggers the 'data' event
Readable.read
/ / if the length of read is greater than hwm, hwmif (n > state.highWaterMark) {state.highWaterMark = computeNewHighWaterMark (n);} / / call the user-implemented\ _ read method try {const result = this.\ _ read (state.highWaterMark); if (result! = null) {const then = result.then If (typeof then = = 'function') {then.call (result, nop, function (err) {errorOrDestroy (this, err);});}} catch (err) {errorOrDestroy (this, err);}
If the _ read method implemented by the user returns a promise, call the then method of the promise to pass in the successful and failed callbacks to facilitate the handling of exception cases.
The core code for the read method to read zone data from the buffer is as follows:
Readable.read
Function fromList (n, state) {/ / nothing buffered. If (state.length = 0) return null; let ret; if (state.objectMode) ret = state.buffer.shift (); else if (! n | | n > = state.length) {/ / handle cases where n is empty or greater than the length of the buffer / / Read it all, truncate the list. If (state.decoder) / / if there is a decoder, serialize the result into the string ret = state.buffer.join (''); else if (state.buffer.length = 1) / / has only one data and returns the header node data ret = state.buffer.first (); else / / stores all the data in a Buffer ret = state.buffer.concat (state.length); state.buffer.clear () / / clear buffer} else {/ / handle cases where the read length is less than the buffer ret = state.buffer.consume (n, state.decoder);} return ret;}
2.4.3. _ read
The method that the user must implement when initializing Readable stream, in which the push method can be called to continuously trigger the read method, and the write operation of the stream can be stopped when we push null.
Sample code:
Readable._read
Const Stream = require ('stream'); const readableStream = new Stream.Readable ({read (hwm) {this.push (String.fromCharCode (this.currentCharCode++)); if (this.currentCharCode > 122) {this.push (null);}},},}); readableStream.currentCharCode = 97 leading Stream.pipe (process.stdout); / / abcdefghijklmnopqrstuvwxyz%
2.4.4. Pipe (important)
Bind one or more writable streams to the current Readable stream and switch the Readable stream to flow mode.
There are a lot of event listener handles in this method, which are not covered here:
Readable.pipe
Readable.prototype.pipe = function (dest, pipeOpts) {const src = this; const state = this.\ _ readableState; state.pipes.push (dest); / / collect Writable stream src.on ('data', ondata); function ondata (chunk) {const ret = dest.write (chunk); if (ret = = false) {pause ();}} / / Tell the dest that it's being piped to. Dest.emit ('pipe', src); / / starts the flow, if the flow is in pause mode if (dest.writableNeedDrain = true) {if (state.flowing) {pause ();}} else if (! state.flowing) {src.resume ();} return dest;}
The pipe operation is very similar to Linux's pipe operator'|', changing the left output to the right input, which collects the writable stream for maintenance and triggers the 'data' event when the readable stream.
When there is data outflow, the write event of the writable stream will be triggered, so that the data can be transferred and the operation like a pipeline can be realized. And the readable stream in pause mode is automatically changed into flow mode.
2.4.5. Resume
Switch the stream from 'pause' mode to 'flow' mode. If 'readable' event listening is set, then this method has no effect.
Readable.resume
Readable.prototype.resume = function () {const state = this._readableState; if (! state.flowing) {state.flowing =! state.readableListening; / / is in flow mode depending on whether the 'readable' listening handle resume (this, state);}}; function resume (stream, state) {if (! state.resumeScheduled) {/ / switch is set so that the resume_ method is called state.resumeScheduled = true only once in the same Tick Process.nextTick (resume_, stream, state);}} function resume_ (stream, state) {if (! state.reading) {stream.read (0);} state.resumeScheduled = false; stream.emit ('resume'); flow (stream);} function flow (stream) {/ / when the stream is in stream mode, the method continues to read data from buffer until the buffer is empty const state = stream._readableState While (state.flowing & & stream.read ()! = = null); / / because the read method is called here, setting the stream of the 'readable' event listener, or the read method may be called, / / resulting in data incoherence (does not affect data, only affects calling the read method to read data in the' readable' event callback)}
2.4.6. Pause
Change the flow from flow mode to pause mode, stop firing the 'data' event, and save all data to the buffer
Readable.pause
Readable.prototype.pause = function () {if (this._readableState.flowing! = = false) {debug ('pause'); this._readableState.flowing = false; this.emit (' pause');} return this;}
2.5. Method of use and working mechanism
The usage method is already described in the BufferList section, create an instance of Readable and implement its _ read () method, or implement the read method in the first object parameter of the constructor.
2.5.1. Working mechanism
Only a rough flow is drawn here, as well as the trigger conditions for the pattern transformation of the Readable flow.
Where:
NeedReadable (true): pause mode and buffer data
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.