Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

The actual combat case of SparkStreaming

2025-02-28 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

Don't talk too much nonsense, just go to the practical information!

Related dependencies:

UTF8 1.8 UTF-8 2.11.8 2.3.2 2.7.6 2.11 org.scala-lang scala-library ${scala.version} org.apache.spark spark-core_2.11 ${spark.version} org.apache.spark spark-sql_2.11 ${spark.version} Org.apache.spark spark-streaming_2.11 ${spark.version} org.apache.spark spark-streaming-kafka-0-8pm 2.11 ${spark.version} org.scalikejdbc scalikejdbc_2.11 3.2.0 org.apache.curator curator-recipes 2.8.0 Junit junit 4.12 compile (1) spark streaminging stateless computing WordCount

Programming architecture:

Start nc-lk 9999 on a node and use it as a data source. Write programs to realize the wordcount of the network.

Code implementation:

Object NetWordCount {/ * programming routine: * 1. Get the programming entry, StreamingContext * 2. Build the first DStream * 3 through StreamingContext. Perform various transformation operations on DStream * 4. Perform output operations on the data results * 5. Submit sparkStreaming application * / def main (args: Array [String]): Unit = {/ / block log Logger.getLogger ("org.apache.hadoop") .setLevel (Level.ERROR) Logger.getLogger ("org.apache.zookeeper") .setLevel (Level.WARN) Logger.getLogger ("org.apache.hive") .setLevel (Level.WARN) / / 1. Get the programming entry, StreamingContext val conf= new SparkConf (). SetMaster ("local [2]") .setAppName ("NetWordCount") / / the second parameter indicating the batch length val ssc=new StreamingContext (conf,Seconds (2)) / * 2. Build the first DStream (read data over the network) * the first parameter: hostname * the second parameter: Port number * / val ReceiverInputDStream: ReceiverInputDStream [String] = ssc.socketTextStream ("test", 9999) / / 3. Perform various transformation operations on DStream val wordDS: DStream [String] = ReceiverInputDStream.flatMap (msg = > {msg.split ("\\ s +")}) val wordCountDS: DStream [(String, Int)] = wordDS.map (word= > (word,1)). ReduceByKey (_ + _) / / 4. Output the data results, here is the printout wordCountDS.print () / / 5. Submit the sparkStreaming application ssc.start () ssc.awaitTermination ()}}

Use nc-lk 9999 to send a message on the appropriate node (every other batch time) and view the console print:

Batch2

Batch3

The results show that: because the current operation is stateless, it is processed every 2 seconds, but the number of words each time will not be counted, that is, only the words of the current batch will be counted, but those entered before will not be counted.

(2) WordCount of spark streaminging stateful computing

The same wordCounte, this time to achieve the effect is: so far, count all the words in the past period of time.

Code:

Object UpdateStateBykeyWordCount {/ * programming routine: * 1. Get the programming entry, StreamingContext * 2. Build the first DStream * 3 through StreamingContext. Perform various transformation operations on DStream * 4. Perform output operations on the data results * 5. Submit sparkStreaming application * / def main (args: Array [String]): Unit = {/ / block log Logger.getLogger ("org.apache.hadoop") .setLevel (Level.ERROR) Logger.getLogger ("org.apache.zookeeper") .setLevel (Level.WARN) Logger.getLogger ("org.apache.hive") .setLevel (Level.WARN) / / 1. Get the programming entry, StreamingContext val conf = new SparkConf (). SetMaster ("local [2]") .setAppName ("NetWordCount") / / the second parameter, indicating the batch length val ssc = new StreamingContext (conf, Seconds (2)) ssc.checkpoint ("C:\\ z_data\\ checkPoint\\ checkPoint_1") / * * 2. Build the first DStream (read data over the network) * the first parameter: hostname * the second parameter: Port number * / val ReceiverInputDStream: ReceiverInputDStream [String] = ssc.socketTextStream ("test", 9999) / / 3. Perform various transformation operations on DStream val wordDS: DStream [(String,Int)] = ReceiverInputDStream.flatMap (msg = > {msg.split ("\\ s +")}) .map (word= > (word,1)) / * updateStateByKey is the status update function * updateFunc: (Seq [V], Option [S]) = > Option [S] * (UMagazine C) = > C * values:Seq [Int], state: Option [Int] * * @ param values: new value * @ param state: status value * @ return * / val updateDS: DStream [(String) Int)] = wordDS.updateStateByKey ((values: Seq [Int], state: Option [Int]) = > {Option (values.sum + state.getOrElse (0))}) / / 4. Output the data results, here is the printout updateDS.print () / / 5. Submit the sparkStreaming application ssc.start () ssc.awaitTermination ()}}

Use nc-kl 9999:

Observation console:

Batch2

Batch3

It is found that the results of two batches are aggregated, that is, the so-called stateful computation.

Note:

Ssc.checkpoint ("C:\\ z_data\\ checkPoint\ checkPoint_1")

The above code must be added, he will save the results of the last batch calculation, if not:

Error: requirement failed: The checkpoint directory has not been set. Please set it by StreamingContext.checkpoint ().

(2) HA of spark streaminging

   in the above updateStateByKey code, if the current program runs abnormally, it will lose data (after restart, the original calculated data cannot be found), because the programming entry StreamingContext is regenerated when the code is re-run, so that when the program exits abnormally, the last StreamingContext object can still be obtained the next time it starts, to ensure that the calculation data will not be lost. At this point, you need to store the StreamingContext object in the persistent system. In other words, you need to make the HA of the StreamingContext object.

Code:

Object WC_DriverHA {def main (args: Array [String]): Unit = {/ / block log Logger.getLogger ("org.apache.hadoop") .setLevel (Level.ERROR) Logger.getLogger ("org.apache.zookeeper") .setLevel (Level.WARN) Logger.getLogger ("org.apache.hive") .setLevel (Level.WARN) / * StreamingContext.getOrCreate () * the first parameter: checkpointPath Consistent with the checkpointPath directory in the following method * second parameter: creatingFunc: () = > StreamingContext: used to create StreamingContext objects * finally use StreamingContext.getOrCreate () to implement the HA of StreamingContext objects Make sure that when the program is run again, the previous state can still be restored to * / val ssc= StreamingContext.getActiveOrCreate ("C:\ z_data\\ checkPoint\ checkPoint_HA", functionToCreateContext) ssc.start () ssc.awaitTermination ()} def functionToCreateContext (): StreamingContext= {/ / 1. Get the programming entry, StreamingContext val conf = new SparkConf (). SetMaster ("local [2]") .setAppName ("NetWordCount") / / the second parameter, indicating the batch length val ssc = new StreamingContext (conf, Seconds (2)) ssc.checkpoint ("C:\\ z_data\\ checkPoint\\ checkPoint_HA") / * * 2. Build the first DStream (read data over the network) * the first parameter: hostname * the second parameter: Port number * / val ReceiverInputDStream: ReceiverInputDStream [String] = ssc.socketTextStream ("test", 9999) / / 3. Perform various transformation operations on DStream val wordDS: DStream [(String,Int)] = ReceiverInputDStream.flatMap (msg = > {msg.split ("\\ s +")}) .map (word= > (word,1)) val updateDS: DStream [(String,Int)] = wordDS.updateStateByKey ((values: Seq [Int]) State: Option [Int]) = > {Option (values.sum + state.getOrElse (0))} / / 4. Output the data results, here is the printout updateDS.print () / / 5. Submit the sparkStreaming application ssc.start () ssc.awaitTermination () ssc}}

Test:

  -first run normally for a period of time and calculate the result

  -stop the program

  -start again

  -verify that the program started again can get back the results calculated before the stop

Principle:

If    is executed for the first time, then there is no streamingContext object in this checkpointDriectory directory, so if you want to create it, the second time you run it, it will not be created again, then it will be read from the checkpointDriectory directory for recovery.

Note:

   normally, HA in this way can only persist state data to persistent files, and StreamingContext objects will not be persisted to CheckPointDriectory by default.

(3) Summary of checkpoint:   1) introduction of checkpoint:

There are two types of checkpoint for    to recover from a failure

Metadata information in    -Metadata checkpointing:driver nodes

 -Configuration: configuration for creating streaming applications

 -DStream: defines the DStream operation of a streaming program

 -Incomplete batches: batch job queuing but not yet completed. (the location where the program was last run)

   -Data checkpointing: save the generated RDD to reliable storage

 -RDD generated after calculation

 -data received in receiver, converted RDD

  2) timing of checkpoint launch:

  -recover from a failure of the driver running the application-metadata, (HA of driver)

  -start checkPoint:updateStateByKey or reduceByKeyAndWindow when using stateful computing.

  3) configuration of checkpoint:

  -when stateful computing:

     ssc.checkpoint ("C:\\ z_data\\ checkpoint")

HA of   -driver:

Ssc.checkpoint ("C:\\ z_data\\ checkpoint") ssc = StreamingContext.getOrCreate ("C:\\ z_data\\ checkpoint", functionToCreateContext) (4) transform operation of Spark Streaming

   introduces two important concepts when using transform operations:

   blacklist: if more operations are allowed than are not allowed, add the disallowed operations to the blacklist

   whitelist: if fewer operations are allowed than are not allowed, then add the allowed operations to the whitelist

Code:

Object _ 1Streaming_tranform {def main (args: Array [String]): Unit = {/ / define blacklist val black_list=List ("@", "#", "$" "%" Logger.getLogger ("org.apache.hadoop") .setLevel (Level.ERROR) Logger.getLogger ("org.apache.zookeeper") .setLevel (Level.WARN) Logger.getLogger ("org.apache.hive") .setLevel (Level.WARN) / / 1. Get the programming entry, StreamingContext val conf = new SparkConf (). SetMaster ("local [2]"). SetAppName ("_ 1Streaming_tranform") val ssc=new StreamingContext (conf,Seconds (2)) / / 2. Read data from the corresponding network port val inputDStream: ReceiverInputDStream [String] = ssc.socketTextStream ("test" 9999) / / 2.1broadcast the blacklist val bc = ssc.sparkContext.broadcast (black_list) / / 2.2 set checkpoint ssc.checkpoint ("C:\\ z_data\\ checkPoint\\ checkPoint_1") / / 3 Business processing val wordDStream: DStream [String] = inputDStream.flatMap (_ .split (")) / / transform method: take a RDD from DStream After transformsFunc calculation Return a new RDD val fileterdDStream: DStream [String] = wordDStream.transform (rdd= > {/ / filter out the data in the blacklist val blackList: List [String] = bc.value rdd.filter (word= > {! blackList.contains (word)})}) / / 3.2 count the corresponding words val resultDStream = fileterdDStream.map (msg = > (msg) UpdateStateByKey ((values: Seq [Int], stats: Option [Int]) = > {Option (values.sum + stats.getOrElse (0))}) / / 4 print output resultDStream.print () / / 5. Open streaming stream ssc.start () ssc.awaitTermination ()}}

The data in the blacklist is filtered:

(5) window operation of Spark Streaming

Note:

When doing the window operation:

  -the length of time of the data stream covered by the window, which must be a multiple of the batch interval

  -the length of time that elapsed from the previous window to the next window must be a multiple of the batch interval.

Pseudo code:

/ / 1. Get the programming entry, StreamingContext val conf = new SparkConf (). SetMaster ("local [2]"). SetAppName ("WordCount_Window") val ssc=new StreamingContext (conf,Seconds (batchInvail.toLong)) / / 2. Read data from the corresponding network port val inputDStream: ReceiverInputDStream [String] = ssc.socketTextStream (hostname,port.toInt) val lineDStream: DStream [String] = inputDStream.flatMap (_ .split (")) val wordDStream: DStream [(String, Int)] = lineDStream.map ((_, 1)) / * * every 4 seconds Calculate the data in the past 6 seconds * reduceFunc: data merge function * windowDuration: window size (data in the past 6 seconds) * slideDuration: window sliding time (every 4 seconds) * / val resultDStream: DStream [(String, Int)] = wordDStream.reduceByKeyAndWindow ((kv1:Int, kv2:Int) = > kv1+kv2, Seconds (batchInvail.toLong * 3) Seconds (batchInvail.toLong * 2) resultDStream.print () ssc.start () ssc.awaitTermination () (6) ForeachRDD operation of Spark Streaming

Concept:

Foreach: traversing every element in a distributed collection (rdd) foreachPartition: traversing every partition in a distributed collection (rdd) foreachRDD: traversing every RDD in a distributed collection (DStream)

This operator is used well, and usually the performance of the program will be greatly improved.

Pseudo code: / / this method means traversing every rdd windowDS.foreachRDD in DStream (rdd= > {if (! rdd.isEmpty ()) {rdd.mapPartitions (ptn= > {if (! ptn.isEmpty) {ptn.foreach (msg= > {/ / do the corresponding operation here). )})})

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report