In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-02 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
How to achieve SparkStreaming conversion operation, I believe that many inexperienced people do not know what to do, so this paper summarizes the causes of the problem and solutions, through this article I hope you can solve this problem.
The conversion operation of DStream can be divided into stateless and stateful.
In a stateless transition operation, the processing of each batch does not depend on the data of the previous batch.
The stateful transition operation needs to use the data of the previous batch or the intermediate results to calculate the data of the current batch. The stateful transition operation includes the transition operation based on sliding window and the transition operation that tracks the state change.
Stateless transition
The essence of stateless transformation operation is to apply a simple RDD transformation operation to each batch, that is, to transform every RDD of DStream.
Transform operator
Transform allows arbitrary RDD-to-RDD functions to be executed on DStream. Even if these functions are not exposed in DStream's API, you can easily extend Spark API through this function. This function is scheduled once per batch. In fact, it is to apply the transformation to the RDD in DStream.
Def main (args: Array [String]): Unit = {val conf: SparkConf = new SparkConf (). SetAppName ("transform"). SetMaster ("local [*]") val sc: StreamingContext = new StreamingContext (conf, Seconds (3) val lines = sc.socketTextStream ("localhost", 9999) / / transform method can operate the underlying RDD after it is obtained / / 1. DStream function is not perfect / / 2. The code needs to be executed periodically / / Code: Driver side val newDS: DStream [String] = lines.transform (rdd = > {/ / Code: Driver side (periodic execution) rdd.map (str = > {/ / Code: Executor side str}) / / Code: Driver side val newDS1: DStream [String] = lines.map (data = > {/ / Code: Executor side data}) sc.start () sc.awaitTermination ()} join operator
The join between two streams requires the batch size of the two streams to be the same so that the calculation can be triggered at the same time. The calculation process is to join the respective RDD in the two streams of the current batch, which is the same as the join of the two RDD.
Def main (args: Array [String]): Unit = {val sparkConf = new SparkConf (). SetMaster ("local [*]"). SetAppName ("SparkStreaming") val ssc = new StreamingContext (sparkConf, Seconds (5) val data9999 = ssc.socketTextStream ("localhost", 9999) val data8888 = ssc.socketTextStream ("localhost", 8888) val map9999: DStream [(String, Int)] = data9999.map ((_, 9) val map8888: DStream [(String, Int)] = data8888.map ((_ _) 8)) / / the so-called Join operation of DStream It is actually two join val joinDS of RDD: DStream [(String, (Int, Int))] = map9999.join (map8888) joinDS.print () ssc.start () ssc.awaitTermination ()} stateful transition
A stateful transition operation is an operation that tracks data across time intervals, that is, some data from previous batches are also used to calculate results in new batches. There are two main types of state transitions:
Sliding window: operate with a time phase as a sliding window
UpdateStateByKey (): track changes in the state of data through key values
Stateful transition operations require checkpointing mechanisms to be turned on in StreamingContext to improve fault tolerance.
UpdateStateByKey def main (args: Array [String]): Unit = {val conf: SparkConf = new SparkConf (). SetMaster ("local [*]"). SetAppName ("updateStateByKey") val sc: StreamingContext = new StreamingContext (conf, Seconds (4) sc.checkpoint ("cp") val ds: ReceiverInputDStream [String] = sc.socketTextStream ("localhost", 9999) val value: DStream [(String, Int)] = ds.map (_: String)) 1) / / updateStateByKey: the parameter passed to update the status of the data according to key contains two values / / the first value represents the collection of value data of the same key / / the second value represents the calculated value val state corresponding to the cache key: DStream [(String, Int)] = value.updateStateByKey ((seq: Seq [Int]) Option: Option [Int]) = > {val newCount: Int = option.getOrElse (0) + seq.sum Option (newCount)}) state.print () sc.start () sc.awaitTermination ()} window
All window-based functions require two parameters, corresponding to the window length and the sliding step, both of which must be an integral multiple of the batch interval of the SparkStreaming.
The window length controls the number of batches used to calculate each time.
The sliding step is used to control the interval at which the new DStream is calculated
Window operation
Count the elements in the window based on window
Def main (args: Array [String]): Unit = {val sparkConf = new SparkConf (). SetMaster ("local [*]"). SetAppName ("SparkStreaming") val ssc = new StreamingContext (sparkConf, Seconds (3) val lines = ssc.socketTextStream ("localhost", 9999) val wordToOne = lines.map ((_, 1) val windowDS: DStream [(String, Int)] = wordToOne.window (Seconds (6)) Seconds (6) val wordToCount = windowDS.reduceByKey (_ + _) wordToCount.print () ssc.start () ssc.awaitTermination ()} reduce operation
The inverse operation specification is a more efficient specification operation. By considering only the new elements entering the window and those leaving the window, let spark incrementally calculate the result of the reduction, which is reflected in the code as reduceFunc and invReduceFunc.
General reduction operation
Def main (args: Array [String]): Unit = {val sparkConf = new SparkConf (). SetMaster ("local [*]"). SetAppName ("SparkStreaming") val ssc = new StreamingContext (sparkConf, Seconds (3) ssc.checkpoint ("cp") val lines = ssc.socketTextStream ("localhost", 9999) lines.reduceByWindow ((x: String, y: String) = > {x + "-" + y}, Seconds (9) Seconds (3). Print () ssc.start () ssc.awaitTermination ()}
Have reverse reduction operation
Def main (args: Array [String]): Unit = {val sparkConf = new SparkConf (). SetMaster ("local [*]"). SetAppName ("SparkStreaming") val ssc = new StreamingContext (sparkConf, Seconds (3) ssc.checkpoint ("cp") val lines = ssc.socketTextStream ("localhost", 9999) val wordToOne = lines.map ((_) ) / * inverse reduction based on window: improve performance by controlling the elements that flow out and into the window * / val windowDS: DStream [(String, Int)] = wordToOne.reduceByKeyAndWindow ((x:Int, y:Int) = > {x + y}, (x:Int, y:Int) = > {x-y}, Seconds (9) Seconds (3) windowDS.print () ssc.start () ssc.awaitTermination ()} count Operation def main (args: Array [String]): Unit = {val sparkConf = new SparkConf (). SetMaster ("local [*]"). SetAppName ("SparkStreaming") val ssc = new StreamingContext (sparkConf, Seconds (3) ssc.checkpoint ("cp") val lines = ssc.socketTextStream ("localhost") 9999) / * count the number of data entered in the window * for example, 10 pieces of data were entered in 3 seconds. Then print 10 * / val countByWindow: DStream [Long] = lines.countByWindow (Seconds (9), Seconds (3) countByWindow.print () / * the number of each value in the statistics window * for example, enter 1 3 2 4 3 5 in 3 s. Then print (3ssc.awaitTermination 1) (2je 4) (3je 5) * / val countByValueAndWindow: DStream [(String, Long)] = lines.countByValueAndWindow (Seconds (9), Seconds (3) countByValueAndWindow.print () ssc.start () ssc.awaitTermination ()} after reading the above, have you mastered the method of how to realize the SparkStreaming conversion operation? If you want to learn more skills or want to know more about it, you are welcome to follow the industry information channel, thank you for reading!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.