In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-16 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)05/31 Report--
This article mainly explains "how to achieve updateStateByKey and mapwithstate". The content in the article is simple and clear, and it is easy to learn and understand. Please follow the editor's train of thought to study and learn how to achieve updateStateByKey and mapwithstate.
UpdateStateByKey and mapwithstate can not be found in Dstream. They are implemented by implicit conversion.
From this we can see that these two methods are finally implemented through PairDStreamFunctions.
UpdateStateByKey
NewUpdateFunc method is a method of how to update on the original basis.
DefaultPartitioner () gets the default number of partitions
There is a very critical point in the following code
New StateDStream (self, ssc.sc.clean (updateFunc), partitioner, rememberPartitioner, None)
StateDStream inherits from Dstream.
StateDStream persists itself into memory
There is a general method: if there is a parent RDD, the computeUsingPreviousRDD method will be executed.
In this method, there is a performance bottleneck in the code
Every time you update, you will cogroup the original parentRDD, so that the program keeps running, which will lead to slower and slower! Try not to use the change method as much as possible!
Mapwithstate
The return value of the mapWithState method is MapWithStateDStream. Let's look at its implementation class.
MapWithStateDStreamImpl
Finally return InternalMapWithStateDStream
Like updateStateByKey, it is persisted in memory.
Persist (StorageLevel.MEMORY_ONLY)
Next, take a look at each of the most important methods inherited from Dstream, compute:
The final operation is RDD:MapWithStateRDD.
Partition in RDD is represented by MapWithStateRDDRecord
MapWithStateRDDRecord has accompanying objects: the method in, this method is to update the state operation, unlike updateStateByKey every time into the cogroup operation, but on the basis of the original update, the efficiency has been improved!
Def updateRecordWithData [K: ClassTag, V: ClassTag, S: ClassTag, E: ClassTag] (
PrevRecord: Option [MapWithStateRDDRecord [K, S, E]]
DataIterator: Iterator [(K, V)]
MappingFunction: (Time, K, Option [V], State [S]) = > Option [E]
BatchTime: Time
TimeoutThresholdTime: Option [Long]
RemoveTimedoutData: Boolean
): MapWithStateRDDRecord [K, S, E] = {
/ / Create a new state map by cloning the previous one (if it exists) or by creating an empty one
Val newStateMap = prevRecord.map {_ .stateMap.copy ()}. GetOrElse {new EmptyStateMap [K, S] ()}
Val mappedData = new ArrayBuffer [E]
Val wrappedState = new StateImpl [S] ()
/ / Call the mapping function on each record in the data iterator, and accordingly
/ / update the states touched, and collect the data returned by the mapping function
DataIterator.foreach {case (key, value) = >
WrappedState.wrap (newStateMap.get (key))
Val returned = mappingFunction (batchTime, key, Some (value), wrappedState)
If (wrappedState.isRemoved) {
NewStateMap.remove (key)
} else if (wrappedState.isUpdated
| | (wrappedState.exists & & timeoutThresholdTime.isDefined)) {
NewStateMap.put (key, wrappedState.get (), batchTime.milliseconds)
}
MappedData + + = returned
}
/ / Get the timed out state records, call the mapping function on each and collect the
/ / data returned
If (removeTimedoutData & & timeoutThresholdTime.isDefined) {
NewStateMap.getByTime (timeoutThresholdTime.get) .foreach {case (key, state, _) = >
WrappedState.wrapTimingOutState (state)
Val returned = mappingFunction (batchTime, key, None, wrappedState)
MappedData + + = returned
NewStateMap.remove (key)
}
}
MapWithStateRDDRecord (newStateMap, mappedData)
}
}
Thank you for your reading, the above is the content of "how to achieve updateStateByKey and mapwithstate". After the study of this article, I believe you have a deeper understanding of how to achieve updateStateByKey and mapwithstate, and the specific use needs to be verified in practice. Here is, the editor will push for you more related knowledge points of the article, welcome to follow!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.