In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-31 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
This article will explain in detail how the source code analysis of nodejs readable stream is, and the content of the article is of high quality, so the editor will share it with you for reference. I hope you will have a certain understanding of the relevant knowledge after reading this article.
Readable stream is an abstraction of data consumption. There are two working modes of readable stream in nodejs: streaming and pausing. Streaming means that when data is available, a callback will be triggered and the data will be passed to the callback. Pause means that users need to perform read operations manually. Let's take a look at some of the logic implemented by readable streams through the source code. Because the implementation of the code is more, the logic is also relatively around, this article only analyzes some of the main logic, interested can refer to the documentation or take a deep look at the source code for details. Let's first take a look at ReadableState, which represents some of the states and properties of a readable stream.
Function ReadableState (options, stream) {
Options = options | | {}
/ / whether it is a two-way flow
Var isDuplex = stream instanceof Stream.Duplex
/ / data mode
This.objectMode =!! options.objectMode
/ / set the mode of the reader for two-way flow
If (isDuplex)
This.objectMode = this.objectMode | |!! options.readableObjectMode
/ / stop when reading highWaterMark bytes and 16 objects in object mode
This.highWaterMark = getHighWaterMark (this
Options
'readableHighWaterMark'
IsDuplex)
/ / the buffer where the data is stored
This.buffer = new BufferList ()
/ / length of readable data
This.length = 0
/ / destination source and number of pipelines
This.pipes = null
This.pipesCount = 0
/ / working mode
This.flowing = null
/ / whether the stream has ended
This.ended = false
/ / whether the end event has been triggered
This.endEmitted = false
/ / whether the data is being read
This.reading = false
/ / whether to execute events synchronously
This.sync = true
/ / whether the readable event needs to be triggered
This.needReadable = false
/ / whether the readable event is triggered
This.emittedReadable = false
/ / whether the readable event is monitored
This.readableListening = false
/ / whether the process of resume is being executed
This.resumeScheduled = false
/ / has it been destroyed
/ / whether the stream has been destroyed
This.destroyed = false
/ / data encoding format
This.defaultEncoding = options.defaultEncoding | | 'utf8'
/ / in pipelining, how many writers have reached the threshold and need to wait for the drain event to be triggered, and the number of writers whose awaitDrain records have reached the threshold
This.awaitDrain = 0
/ / when executing the maybeReadMore function, set it to true
This.readingMore = false
This.decoder = null
This.encoding = null
/ / Codec
If (options.encoding) {
If (! StringDecoder)
StringDecoder = require ('string_decoder'). StringDecoder
This.decoder = new StringDecoder (options.encoding)
This.encoding = options.encoding
}
}
ReadableState contains a lot of fields, and we can ignore it and look back when we need it. Then we start to look at the implementation of readable streams.
Function Readable (options) {
If (! (this instanceof Readable))
Return new Readable (options)
This._readableState = new ReadableState (options, this)
/ / readable
This.readable = true
/ / two functions implemented by the user
If (options) {
If (typeof options.read = = 'function')
This._read = options.read
If (typeof options.destroy = = 'function')
This._destroy = options.destroy
}
/ / initialize the parent class
Stream.call (this)
}
The above logic is not much, we need to pay attention to read and destroy these two functions, if we are directly using Readable to use readable stream, then options must pass read function, destroy is optional. If we are using Readable in an inherited manner, we must implement the _ read function. Nodejs only abstracts the logic of the flow, and the specific operation (for example, the readable stream is reading data) is implemented by the user, because the read operation is business-related. Let's analyze the operation of the readable stream.
1 the readable stream obtains data from the underlying resources
For the user, the readable stream is the place where the user gets the data, but for the readable stream, the premise of providing the data to the user is that he has his own data, so the readable stream first needs production data. The logic of the production data is implemented by the _ read function. The logic of the _ read function is probably
Const data = getSomeData ()
ReadableStream.push (data)
Through the push function, you can write data to the readable stream, and then you can provide the data to the user. Let's look at the implementation of push and list only the main logic.
Readable.prototype.push = function (chunk, encoding) {
/ / the code for coding processing has been omitted
Return readableAddChunk (this, chunk, encoding, false, skipChunkCheck)
}
Function readableAddChunk (stream, chunk, encoding, addToFront, skipChunkCheck) {
Var state = stream._readableState
/ / push null represents the end of the stream
If (chunk = null) {
State.reading = false
OnEofChunk (stream, state)
} else {
AddChunk (stream, state, chunk, false)
}
/ / returns whether more data can be read
Return needMoreData (state)
}
Function addChunk (stream, state, chunk, addToFront) {
/ / if it is in stream mode and there is no data in the cache, the data event will be triggered directly
If (state.flowing & & state.length = 0 & &! state.sync) {
Stream.emit ('data', chunk)
} else {
/ / otherwise, cache the data first.
State.length + = state.objectMode? 1: chunk.length
If (addToFront)
State.buffer.unshift (chunk)
Else
State.buffer.push (chunk)
/ / if the readable event is listened for, the readable event is triggered.
If (state.needReadable)
EmitReadable (stream)
}
/ / continue to read data, if possible
MaybeReadMore (stream, state)
}
In general, the readable stream first needs to get the data from somewhere, deliver it directly to the user according to the current working mode, or cache it first. And if possible, continue to obtain data.
2 users obtain data from readable streams
Users can get data from a readable stream through the read function or by listening for data events
Readable.prototype.read = function (n) {
N = parseInt (n, 10)
Var state = this._readableState
/ / calculate the readable size
N = howMuchToRead (n, state)
Var ret
/ / if the number that needs to be read is greater than 0, the read data will be fetched to ret and returned.
If (n > 0)
Ret = fromList (n, state)
Else
Ret = null
/ / subtract the length just read
State.length-= n
/ / if there is no data in the cache or if it is less than the threshold after reading, the readable stream can continue to obtain data from the underlying resources.
If (state.length = 0 | | state.length-n
< state.highWaterMark) { this._read(state.highWaterMark); } // 触发data事件 if (ret !== null) this.emit('data', ret); return ret; }; 读取数据的操作就是计算缓存里有多少数据可以读,和用户需要的数据大小,取小的,然后返回给用户,并触发data事件。如果数据还没有达到阈值,则触发可读流从底层资源中获取数据。 3销毁流function destroy(err, cb) { // 设置已销毁标记 if (this._readableState) { this._readableState.destroyed = true; } // 执行_destroy钩子函数,用户可以重写这个函数 this._destroy(err || null, (err) =>{
/ / if there is an error, but no callback is set, the error event is triggered
If (! cb & & err) {
Process.nextTick () = > {
This.emit ('error', err)
}, this, err)
} else if (cb) {
/ / execute if callback exists
Cb (err)
}
});
Return this
}
Let's take a look at the default _ destroy function provided by Readable.
Readable.prototype._destroy = function (err, cb) {
This.push (null)
Cb (err)
}
When I analyzed the push function just now, I already saw that this.push (null) indicates that the stream is over. Destroying the stream means shutting down the underlying resources corresponding to the stream and no longer providing data services.
Summary: this is the end of this article, the stream implementation code is not very difficult, but very around, interested can take a detailed look at the source code, and finally share a picture drawn a long time ago (link https://www.processon.com/view/link/5cc7e9e5e4b09eb4ac2e0688).
On the nodejs readable stream of source code analysis is shared here, I hope that the above content can be of some help to you, can learn more knowledge. If you think the article is good, you can share it for more people to see.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.