Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Implementation and usage of SparkStreaming

2025-03-29 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/03 Report--

This article mainly explains the "implementation and use of SparkStreaming", the content of the explanation is simple and clear, easy to learn and understand, the following please follow the editor's ideas slowly in depth, together to study and learn "the implementation and use of SparkStreaming" bar!

I. DStream integrates RDD1. Official website operator

two。 The most commonly used in use case production is that there are many domain names in one file and a blacklist in the other. To eliminate data 1: log information DStream domain,traffic xinlang.com xinlang.com baidu.com data 2: existing file blacklist RDD domain baidu.com3.RDD to achieve the above requirements package sparkstreaming02import org.apache.spark. {SparkConf SparkContext} import scala.collection.mutable.ListBufferobject Demo1 {def main (args: Array [String]): Unit = {val conf = new SparkConf (). SetAppName ("Demo1"). SetMaster ("local [2]") val sc = new SparkContext (conf) val input1 = new ListBuffer [(String,Long)] input1.append (("www.xinlang.com", 8888) input1.append (("www.xinalng.com", 9999)) input1.append (("www.baidu.com") 7777) val data1 = sc.parallelize (input1) / / join must be in the form of key,value val input2 = new ListBuffer [(String,Boolean)] input2.append (("www.baidu.com", true)) val data2 = sc.parallelize (input2) data1.leftOuterJoin (data2) .filter (x = > {x._2._2.getOrElse (false)! = true}) .map (x = > (x.room1) Implement package sparkstreaming02import org.apache.spark.SparkConfimport org.apache.spark.streaming. {Seconds, StreamingContext} import scala.collection.mutable.ListBufferobject Streaming {def main (args: Array [String]): Unit = {val conf = new SparkConf (). SetAppName ("Streaming"). SetMaster ("local [2]") val ssc = new StreamingContext (conf,Seconds (10)) val lines = ssc.socketTextStream ("s201") 9999) / data 2: rdd val input2 = new ListBuffer [(String,Boolean)] input2.append (("www.baidu.com", true)) val data2 = ssc.sparkContext.parallelize (input2) lines.map (x = > (x.split (",") (0)) ) .transform (rdd = > {rdd.leftOuterJoin (data2) .filter (x = > {x._2._2.getOrElse (false)! = true / / pay attention to filtering after join}) .map (x = > (x.print () ssc.start () ssc.awaitTermination ()}) .SparkStreaming inserts external data source 1. For inserting external data sources, but there are several pits to use this

two。 An example of an error on the official website

3. Reason connect is created on the driver side, record is in executor, serialization error is sent

4. Solution: the first drawback of putting connect on the executor side is that each record generates a connection resource words.foreachRDD {rdd = > rdd.foreach {record = > val connection = createConnection () / / executed at the driver val word = record._1 val count = record._2.toInt val sql = s "insert into wc (wc,count) values ($word) $count) "connection.createStatement (). Execute (sql)} 5. Final solution package sparkstreaming02import java.sql.DriverManagerimport org.apache.spark.SparkConfimport org.apache.spark.streaming. {Seconds, StreamingContext} object MysqlStreaming {def main (args: Array [String]): Unit = {val conf = new SparkConf (). SetMaster ("local [2]"). SetAppName ("MysqlStreaming") val ssc = new StreamingContext (conf,Seconds (1)) val lines = ssc.socketTextStream ("s201", 9999) val words = lines.flatMap (x = > x.split (") Map ((_, 1)). ReduceByKey (_ + _) / / words.foreachRDD {rdd = > / / val connection = createConnection () / / executed at the driver// rdd.foreach {record = > / / val word = record._1// val count = record._2// val sql = s "insert into wc (word,count) values ($word) $count) "/ / connection.createStatement (). Execute (sql) / /} /} / / words.foreachRDD {rdd = > / / rdd.foreach {record = > / / val connection = createConnection () / / executed at the driver// val word = record._1// val count = record._2.toInt// val sql = s" insert into wc (wc) Count) values ($word,$count) "/ / connection.createStatement (). Execute (sql) / /} /} / / final words.foreachRDD {rdd = > rdd.foreachPartition {partitionOfRecords = > val connection = createConnection () partitionOfRecords.foreach (record = > {val word = record._1 val count = record._2 val sql = s" insert into wc (wc) Count) values ('$word',$count) "connection.createStatement (). Execute (sql)}} ssc.start () ssc.awaitTermination ()} def createConnection () = {Class.forName (" com.mysql.cj.jdbc.Driver ") DriverManager.getConnection (" jdbc:mysql://localhost:3306/hive?serverTimezone=UTC&useSSL=false "," root "," 123456 ")} 6. When there is a problem error, when inserting into the database, you need to insert a string using''for example: val sql = s "insert into wc (wc,count) values ($word,$count)" word is a string, or you report this error correctly val sql = s "insert into wc (wc,count) values (' $word',$count)" without double quotation marks

Thank you for your reading, the above is the content of "the implementation and use of SparkStreaming". After the study of this article, I believe you have a deeper understanding of the implementation and use of SparkStreaming, and the specific use needs to be verified in practice. Here is, the editor will push for you more related knowledge points of the article, welcome to follow!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 228

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report