Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What is the source code analysis of nodejs readable streams

2025-03-31 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

This article will explain in detail how the source code analysis of nodejs readable stream is, and the content of the article is of high quality, so the editor will share it with you for reference. I hope you will have a certain understanding of the relevant knowledge after reading this article.

Readable stream is an abstraction of data consumption. There are two working modes of readable stream in nodejs: streaming and pausing. Streaming means that when data is available, a callback will be triggered and the data will be passed to the callback. Pause means that users need to perform read operations manually. Let's take a look at some of the logic implemented by readable streams through the source code. Because the implementation of the code is more, the logic is also relatively around, this article only analyzes some of the main logic, interested can refer to the documentation or take a deep look at the source code for details. Let's first take a look at ReadableState, which represents some of the states and properties of a readable stream.

Function ReadableState (options, stream) {

Options = options | | {}

/ / whether it is a two-way flow

Var isDuplex = stream instanceof Stream.Duplex

/ / data mode

This.objectMode =!! options.objectMode

/ / set the mode of the reader for two-way flow

If (isDuplex)

This.objectMode = this.objectMode | |!! options.readableObjectMode

/ / stop when reading highWaterMark bytes and 16 objects in object mode

This.highWaterMark = getHighWaterMark (this

Options

'readableHighWaterMark'

IsDuplex)

/ / the buffer where the data is stored

This.buffer = new BufferList ()

/ / length of readable data

This.length = 0

/ / destination source and number of pipelines

This.pipes = null

This.pipesCount = 0

/ / working mode

This.flowing = null

/ / whether the stream has ended

This.ended = false

/ / whether the end event has been triggered

This.endEmitted = false

/ / whether the data is being read

This.reading = false

/ / whether to execute events synchronously

This.sync = true

/ / whether the readable event needs to be triggered

This.needReadable = false

/ / whether the readable event is triggered

This.emittedReadable = false

/ / whether the readable event is monitored

This.readableListening = false

/ / whether the process of resume is being executed

This.resumeScheduled = false

/ / has it been destroyed

/ / whether the stream has been destroyed

This.destroyed = false

/ / data encoding format

This.defaultEncoding = options.defaultEncoding | | 'utf8'

/ / in pipelining, how many writers have reached the threshold and need to wait for the drain event to be triggered, and the number of writers whose awaitDrain records have reached the threshold

This.awaitDrain = 0

/ / when executing the maybeReadMore function, set it to true

This.readingMore = false

This.decoder = null

This.encoding = null

/ / Codec

If (options.encoding) {

If (! StringDecoder)

StringDecoder = require ('string_decoder'). StringDecoder

This.decoder = new StringDecoder (options.encoding)

This.encoding = options.encoding

}

}

ReadableState contains a lot of fields, and we can ignore it and look back when we need it. Then we start to look at the implementation of readable streams.

Function Readable (options) {

If (! (this instanceof Readable))

Return new Readable (options)

This._readableState = new ReadableState (options, this)

/ / readable

This.readable = true

/ / two functions implemented by the user

If (options) {

If (typeof options.read = = 'function')

This._read = options.read

If (typeof options.destroy = = 'function')

This._destroy = options.destroy

}

/ / initialize the parent class

Stream.call (this)

}

The above logic is not much, we need to pay attention to read and destroy these two functions, if we are directly using Readable to use readable stream, then options must pass read function, destroy is optional. If we are using Readable in an inherited manner, we must implement the _ read function. Nodejs only abstracts the logic of the flow, and the specific operation (for example, the readable stream is reading data) is implemented by the user, because the read operation is business-related. Let's analyze the operation of the readable stream.

1 the readable stream obtains data from the underlying resources

For the user, the readable stream is the place where the user gets the data, but for the readable stream, the premise of providing the data to the user is that he has his own data, so the readable stream first needs production data. The logic of the production data is implemented by the _ read function. The logic of the _ read function is probably

Const data = getSomeData ()

ReadableStream.push (data)

Through the push function, you can write data to the readable stream, and then you can provide the data to the user. Let's look at the implementation of push and list only the main logic.

Readable.prototype.push = function (chunk, encoding) {

/ / the code for coding processing has been omitted

Return readableAddChunk (this, chunk, encoding, false, skipChunkCheck)

}

Function readableAddChunk (stream, chunk, encoding, addToFront, skipChunkCheck) {

Var state = stream._readableState

/ / push null represents the end of the stream

If (chunk = null) {

State.reading = false

OnEofChunk (stream, state)

} else {

AddChunk (stream, state, chunk, false)

}

/ / returns whether more data can be read

Return needMoreData (state)

}

Function addChunk (stream, state, chunk, addToFront) {

/ / if it is in stream mode and there is no data in the cache, the data event will be triggered directly

If (state.flowing & & state.length = 0 & &! state.sync) {

Stream.emit ('data', chunk)

} else {

/ / otherwise, cache the data first.

State.length + = state.objectMode? 1: chunk.length

If (addToFront)

State.buffer.unshift (chunk)

Else

State.buffer.push (chunk)

/ / if the readable event is listened for, the readable event is triggered.

If (state.needReadable)

EmitReadable (stream)

}

/ / continue to read data, if possible

MaybeReadMore (stream, state)

}

In general, the readable stream first needs to get the data from somewhere, deliver it directly to the user according to the current working mode, or cache it first. And if possible, continue to obtain data.

2 users obtain data from readable streams

Users can get data from a readable stream through the read function or by listening for data events

Readable.prototype.read = function (n) {

N = parseInt (n, 10)

Var state = this._readableState

/ / calculate the readable size

N = howMuchToRead (n, state)

Var ret

/ / if the number that needs to be read is greater than 0, the read data will be fetched to ret and returned.

If (n > 0)

Ret = fromList (n, state)

Else

Ret = null

/ / subtract the length just read

State.length-= n

/ / if there is no data in the cache or if it is less than the threshold after reading, the readable stream can continue to obtain data from the underlying resources.

If (state.length = 0 | | state.length-n

< state.highWaterMark) { this._read(state.highWaterMark); } // 触发data事件 if (ret !== null) this.emit('data', ret); return ret; }; 读取数据的操作就是计算缓存里有多少数据可以读,和用户需要的数据大小,取小的,然后返回给用户,并触发data事件。如果数据还没有达到阈值,则触发可读流从底层资源中获取数据。 3销毁流function destroy(err, cb) { // 设置已销毁标记 if (this._readableState) { this._readableState.destroyed = true; } // 执行_destroy钩子函数,用户可以重写这个函数 this._destroy(err || null, (err) =>

{

/ / if there is an error, but no callback is set, the error event is triggered

If (! cb & & err) {

Process.nextTick () = > {

This.emit ('error', err)

}, this, err)

} else if (cb) {

/ / execute if callback exists

Cb (err)

}

});

Return this

}

Let's take a look at the default _ destroy function provided by Readable.

Readable.prototype._destroy = function (err, cb) {

This.push (null)

Cb (err)

}

When I analyzed the push function just now, I already saw that this.push (null) indicates that the stream is over. Destroying the stream means shutting down the underlying resources corresponding to the stream and no longer providing data services.

Summary: this is the end of this article, the stream implementation code is not very difficult, but very around, interested can take a detailed look at the source code, and finally share a picture drawn a long time ago (link https://www.processon.com/view/link/5cc7e9e5e4b09eb4ac2e0688).

On the nodejs readable stream of source code analysis is shared here, I hope that the above content can be of some help to you, can learn more knowledge. If you think the article is good, you can share it for more people to see.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report