Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to solve the problem that setting timeout () does not take effect when SparkStreaming uses mapWithState

2025-03-26 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

What this article shares with you is about how to solve the problem that setting timeout () does not take effect when SparkStreaming uses mapWithState. The editor thinks it is very practical, so I share it with you to learn. I hope you can get something after reading this article.

Preface

When I am testing the state of SparkStreaming to operate the mapWithState operator, when we set timeout (3s), the data will not expire after 3s, do not operate on this key, and wait until about 30s to clear the expired data.

Baidu has been for a long time, there is little information about timeout, and there is no article to solve this problem, so Baidu is not omnipotent, sometimes it still needs to rely on itself.

So I studied it over the weekend, and then sorted out the results, hoping to help you have a more comprehensive understanding of Spark state calculation.

MapWithState

In theory, Spark Streaming real-time processing, data is like pipelining, the data between each batch is independent, after processing, without leaving any state. But there are inevitably some stateful operations, such as counting how many times a word has appeared from the start of the stream to now, so the state operation occurs.

State operations are divided into updateStateByKey and mapWithState, which are very different. To put it simply, the former outputs full state each time, while the latter outputs incremental state.

Expiration principle

Expiration is estimated to be misunderstood by many people at first. I have just begun to understand that the data will expire after how many seconds after it appears. Actually, no, the term expiration here refers to free time.

The comment goes something like this: timeout () passes in an interval parameter, and if a key has no data inflows from this key larger than this interval, it is considered idle, and the func in mapWithState will be called separately to clear these idle data states.

Write the conclusion first.

After using timeout (), you need to use the following code to clear the invalid key during the interval.

Stream.checkpoint (Seconds (6))

During the checkpoint, a full scan is enabled to clean up the invalid key in the state.

Test val conf = new SparkConf (). SetMaster ("local [2]"). SetAppName ("state") val ssc = new StreamingContext (conf, Seconds (3) ssc.checkpoint (". / tmp") val streams: DStream [(String, Int)] = ssc.socketTextStream ("localhost", 9999) .map (x = > (x, 1)) val result = streams.mapWithState (StateSpec.function ((k: String, v: Option [Int]) State: State [Int]) = > {val count = state.getOption (). GetOrElse (0) println (k) println (v) var sum = 0if (! state.isTimingOut ()) {sum = count + v.get state.update (sum)} else {println ("timeout")} Option (sum)} .timeout (Seconds (3)) / / this line of code is the key / result.checkpoint (Seconds (6)) that triggers the cleaning mechanism ) result.print () ssc.start () ssc.awaitTermination ()

Use the above code to test and set the expiration time to 3s. However, it is found that the key has not expired and will not be cleared after 3 seconds, and it will be cleared after about 30 seconds.

After entering a tom on port 9999, no further action is taken. The test results are as follows:

TomSome (1)-Time: 1618228587000 ms---Some (1) tomNonetimeout- -- Time: 1618228614000 ms---Some (0)

As can be seen from the test results, it is about 27s from input to purge.

We will now release the annotated code, checkpoint every 6s, and type tom:

TomSome (1)-Time: 1618228497000 ms---Some (1) tomNonetimeout- -- Time: 16182 28506000 ms---Some (0)

It took 9 seconds from generation to cleanup, which is exactly the expiration time + the next window time, triggering the checkpoint.

Conjecture

When I first learned the state operation, I thought about how to get rid of some expired key. I didn't finish my idea through timeout (), and I didn't find a solution from the Internet, so I put it aside for the time being. Later, I went back to think about this problem, and then guessed and verified it according to my own ideas.

1. The first thing I look at is the return value of mapWithState ()

2. MapWithStateDStreamImpl

The calculation logic of each Dstream is in compute (). Here, getOrCompute () of internalStream is called. According to the inheritance relationship, this method of the parent class Dstream is called:

The main functions of getOrCompute () are: computing, caching, checkpoint. There are only a few things to remember here: checkpointDuration, the checkpoint interval, and the call to checkpoint (). In fact, the real calculation calls compute (), and then goes to see compute ().

3. InternalMapWithStateDStream

The getOrCompute () method is also called in compute (), which is actually the same as the one called above, which is Dstream. Here we mainly look at the StateRDD generated using createFromRDD ().

4. MapWithStateRDD

This StateRDD is a collection of data that participates in state computation. First, look at how it is generated:

Let's take a look at how StateRDD's compute () is calculated:

As you can see from compute (), the removal of expired key is triggered only when doFullScan is true, and updateRecordWithData () is responsible for a full scan to remove expired key.

This is not, the idea is here, as long as we find a way to turn on FullScan, we can trigger the clearing mechanism by ourselves!

So, let's first look at the default value of doFullScan:

It is not enabled by default, and then use the shortcut keys to see where doFullScan is used:

As you can see from the figure, there are two code changes to doFullScan, and we find these two code:

The first is basically excluded, so there is only the second: checkpoint (). What we need to know is that the state operation must be checkpoint.

Remember getOrCompute () in 2, calling checkpoint () when checkpointDuration is not null.

Let's take a look at how InternalMapWithStateDStream defines this duration in 3:

As shown in the figure, sideDuration is the window time, and multiplying the coefficient 10 is the default checkpoint duration, so when I set the window to 3s, the checkpoint period is 30s to clean up the expired key.

The interval of checkpoint can be set through checkpoint (interval), so the default 30s in the above program is overridden.

5.MapWithStateRDDRecord

Finally, FullScan is enabled in this class, so let's take a look at the annotated introduction to this Record:

It means that KV,updateRecordWithData (), which is responsible for storing the state of StateRDD, is responsible for cleaning up expired Record. Let's take a look at the implementation of this method:

RemoveTimedoutData is whether to enable full scanning, that is, the value of doFullScan.

The above is how to solve the problem that setting timeout () does not take effect when SparkStreaming uses mapWithState. The editor believes that there are some knowledge points that we may see or use in our daily work. I hope you can learn more from this article. For more details, please follow the industry information channel.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report