Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to realize updateStateByKey and mapwithstate

2025-01-16 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)05/31 Report--

This article mainly explains "how to achieve updateStateByKey and mapwithstate". The content in the article is simple and clear, and it is easy to learn and understand. Please follow the editor's train of thought to study and learn how to achieve updateStateByKey and mapwithstate.

UpdateStateByKey and mapwithstate can not be found in Dstream. They are implemented by implicit conversion.

From this we can see that these two methods are finally implemented through PairDStreamFunctions.

UpdateStateByKey

NewUpdateFunc method is a method of how to update on the original basis.

DefaultPartitioner () gets the default number of partitions

There is a very critical point in the following code

New StateDStream (self, ssc.sc.clean (updateFunc), partitioner, rememberPartitioner, None)

StateDStream inherits from Dstream.

StateDStream persists itself into memory

There is a general method: if there is a parent RDD, the computeUsingPreviousRDD method will be executed.

In this method, there is a performance bottleneck in the code

Every time you update, you will cogroup the original parentRDD, so that the program keeps running, which will lead to slower and slower! Try not to use the change method as much as possible!

Mapwithstate

The return value of the mapWithState method is MapWithStateDStream. Let's look at its implementation class.

MapWithStateDStreamImpl

Finally return InternalMapWithStateDStream

Like updateStateByKey, it is persisted in memory.

Persist (StorageLevel.MEMORY_ONLY)

Next, take a look at each of the most important methods inherited from Dstream, compute:

The final operation is RDD:MapWithStateRDD.

Partition in RDD is represented by MapWithStateRDDRecord

MapWithStateRDDRecord has accompanying objects: the method in, this method is to update the state operation, unlike updateStateByKey every time into the cogroup operation, but on the basis of the original update, the efficiency has been improved!

Def updateRecordWithData [K: ClassTag, V: ClassTag, S: ClassTag, E: ClassTag] (

PrevRecord: Option [MapWithStateRDDRecord [K, S, E]]

DataIterator: Iterator [(K, V)]

MappingFunction: (Time, K, Option [V], State [S]) = > Option [E]

BatchTime: Time

TimeoutThresholdTime: Option [Long]

RemoveTimedoutData: Boolean

): MapWithStateRDDRecord [K, S, E] = {

/ / Create a new state map by cloning the previous one (if it exists) or by creating an empty one

Val newStateMap = prevRecord.map {_ .stateMap.copy ()}. GetOrElse {new EmptyStateMap [K, S] ()}

Val mappedData = new ArrayBuffer [E]

Val wrappedState = new StateImpl [S] ()

/ / Call the mapping function on each record in the data iterator, and accordingly

/ / update the states touched, and collect the data returned by the mapping function

DataIterator.foreach {case (key, value) = >

WrappedState.wrap (newStateMap.get (key))

Val returned = mappingFunction (batchTime, key, Some (value), wrappedState)

If (wrappedState.isRemoved) {

NewStateMap.remove (key)

} else if (wrappedState.isUpdated

| | (wrappedState.exists & & timeoutThresholdTime.isDefined)) {

NewStateMap.put (key, wrappedState.get (), batchTime.milliseconds)

}

MappedData + + = returned

}

/ / Get the timed out state records, call the mapping function on each and collect the

/ / data returned

If (removeTimedoutData & & timeoutThresholdTime.isDefined) {

NewStateMap.getByTime (timeoutThresholdTime.get) .foreach {case (key, state, _) = >

WrappedState.wrapTimingOutState (state)

Val returned = mappingFunction (batchTime, key, None, wrappedState)

MappedData + + = returned

NewStateMap.remove (key)

}

}

MapWithStateRDDRecord (newStateMap, mappedData)

}

}

Thank you for your reading, the above is the content of "how to achieve updateStateByKey and mapwithstate". After the study of this article, I believe you have a deeper understanding of how to achieve updateStateByKey and mapwithstate, and the specific use needs to be verified in practice. Here is, the editor will push for you more related knowledge points of the article, welcome to follow!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report