In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-25 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/03 Report--
This article is to share with you the content of a sample analysis of Spark Streaming operator development. The editor thinks it is very practical, so share it with you as a reference and follow the editor to have a look.
An example of Spark Streaming operator development
Transform operator development
When transform operation is applied on DStream, it can be used to perform any conversion operation from RDD to RDD, and it can also be used to achieve operations that are not provided in DStreamAPI. For example, DStreamAPI does not provide the operation of join each batch in a DStream with a specific RDD. The join operator in DStream can only join other DStream, but we can use transform operation to achieve this function.
Example: real-time filtering of blacklist users
Package StreamingDemoimport org.apache.log4j. {Level, Logger} import org.apache.spark.SparkConfimport org.apache.spark.streaming. {Seconds, StreamingContext} / * * Real-time blacklist filtering * / object TransformDemo {def main (args: Array [String]): Unit = {/ / set log level Logger.getLogger ("org") .setLevel (Level.WARN) val conf = new SparkConf () .setAppName (this.getClass.getSimpleName) .setMaster ("local [2]") val ssc = new StreamingContext (conf) Seconds (2)) / / create a blacklist of RDD val blackRDD = ssc.sparkContext.parallelize (Array (("zs", true), ("lisi", true)) / / get data val linesDStream = ssc.socketTextStream ("Hadoop01") from nc through socket 6666) / * filter blacklist user statements * zs sb sb sb sb * lisi fuck fuck fuck * jack hello * / linesDStream .map (x = > {val info = x.split (") (info (0), info.toList.tail.mkString (")}) .transform (rdd = > {/ / transform is an operation of RDD- > RDD Therefore, the return value must be RDD / * after leftouterjoin operation, the result is as follows: * (zs, (sb sb sb sb), Some (true)) * (lisi, (fuck fuck fuck), some (true) * (jack, (hello,None)) * / val joinRDD = rdd.leftOuterJoin (blackRDD) / / if it is Some (true), it means blacklisted users If it is None, if it is not on the blacklist, keep the non-blacklisted users val filterRDD = joinRDD.filter (x = > x._2._2.isEmpty) filterRDD}) .map (x = > (x.Zong1mcx.Max1). Print () ssc.start () ssc.awaitTermination ()}}
test
Start nc and pass in users and their speaking information
You can see that the program filters out the user comments on the blacklist in real time.
UpdateStateByKey operator development
The updateStateByKey operator can maintain an arbitrary state while constantly updating new information. This operator can maintain a state for each key and constantly update the state. For each batch, Spark applies the State update function to each pre-existing key. Regardless of whether the key has a new value in batch or not, if the value returned by the State update function is none, then the corresponding state of the key will be deleted, and the state update function will also be executed for the new key.
To use this operator, you must take two steps
Define that state--state can be any data type
Define a state update function-use a function to specify how to use the previous state, and to get new values from the input stream to update the state
Note: updateStateByKey operation requires that the Checkpoint mechanism must be enabled
Example: real-time WordCount based on cache
Package StreamingDemoimport org.apache.log4j. {Level, Logger} import org.apache.spark.SparkConfimport org.apache.spark.streaming. {Seconds, StreamingContext} / * cache-based real-time WordCount Count the number of word occurrences in the global scope * / object UpdateStateByKeyDemo {def main (args: Array [String]): Unit = {/ / set log level Logger.getLogger ("org") .setLevel (Level.WARN) / * if security authentication is not enabled or the user obtained from Kerberos is null, then get the HADOOP_USER_NAME environment variable * and use its value as Hadoop to execute the user setting hadoop username * here is an experiment without enabling security authentication, even if it is not explicitly added It will also automatically get my user name * / / System.setProperty ("HADOOP_USER_NAME", "Setsuna") val conf = new SparkConf () .setAppName (this.getClass.getSimpleName) .setMaster ("local [2]") val ssc = new StreamingContext (conf, Seconds (2)) / / set the path to Checkpoint storage ssc.checkpoint ("hdfs://Hadoop01:9000/checkpoint") / / create input DStream val lineDStream = ssc.socketTextStream ("Hadoop01") 6666) val wordDStream = lineDStream.flatMap (_ .split (") val pairsDStream = wordDStream.map ((_, 1)) / * state: represents the previous status value * values: represents the value corresponding to key in the current batch * / val resultDStream = pairsDStream.updateStateByKey ((values: Seq [Int], state: Option [Int]) = > {/ / when state is none If the word is not counted, a value of 0 is returned to the counter count var count = state.getOrElse (0) / / traversing values, and the value for of the new word is accumulated (value x + y, / / window length is 5 seconds Seconds (5), / / window interval is 2 seconds Seconds (2)) .transform (rdd = > {/ / transform operator processes rdd Convert to another rdd / / sort according to the number of occurrences of Key, and then sort them in descending order Get the top three search words val info: Array [(String, Int)] = rdd.sortBy (_. _ 2, false) .take (3) / convert Array to resultRDD val resultRDD = ssc.sparkContext.parallelize (info) resultRDD}) .map (x = > s "${x.room1} appears: ${x.room2}") .print () ssc.start () ssc.awaitTermination ()}}
Test result
Overview of DStream Output operation
Spark Streaming allows DStream data to be exported to external systems. All calculations in DSteram are triggered by output operations, and foreachRDD output operations must also be performed on RDD in order to trigger computing logic for each batch.
The transformation description print () prints out the first 10 elements of the data in DStream in Driver. It is mainly used for testing, or when no output operation is required, it is used to simply trigger job. SaveAsTextFiles (prefix
[suffix]) saves the contents of DStream as text files, where the files generated during each batch interval are named as prefix-TIME_IN_MS [.suffix]. SaveAsObjectFiles (prefix
, [suffix]) serializes the content in DStream as an object and saves it in SequenceFile format. The files generated during each batch interval are named as prefix-TIME_IN_MS [.suffix]. SaveAsHadoopFiles (pref
Ix, [suffix]) saves the contents of DStream as Hadoop files as text, where the files generated during each batch interval
Named after prefix-TIME_IN_MS [.suffix]. The most basic output operation of foreachRDD (func). Apply the func function to the RDD in DStream. This operation will output the data to the external system.
System, such as saving RDD to files or network databases, etc. It should be noted that the func function is running the streaming
Executed in the Driver process of the application.
ForeachRDD operator development
ForeachRDD is the most commonly used output operation, which can traverse and process each generated RDD in DStream, and then write the data in each RDD to external storage, such as files, databases, caches, etc., where action operations are usually performed on RDD, such as foreach
Use foreachRDD to operate the database
Usually a Connection, such as JDBC Connection, is created in foreachRDD, and then the data is written to external storage through Connection
Myth 1: create a Connection outside the foreach operation of RDD
Dstream.foreachRDD {rdd = > val connection=createNewConnection () rdd.foreach {record = > connection.send (record)}}
This approach is wrong, which causes the Connection object to be serialized and transferred to every task, but the Connection object does not support serialization, so it cannot be transferred
Myth 2: create a Connection inside the foreach operation of RDD
Dstream.foreachRDD {rdd = > rdd.foreach {record = > val connection = createNewConnection () connection.send (record) connection.close ()}
Although this approach is possible, it is inefficient because it results in the creation of a Connection object for every piece of data in RDD, and the creation of Connection objects usually consumes performance
In a reasonable way
The first: use the foreachPartition operation of RDD, and create a Connection object inside the operation, which is equivalent to creating a Connection object for each partition of RDD, saving a lot of resources
The second: manually encapsulate a static connection pool, use the foreachPartition operation of RDD, and obtain a connection from the static connection pool through the static method, and put it back into the connection pool after the connection is used. In this way, you can reuse connections between multiple partition of RDD
Example: real-time global statistics WordCount, and save the results to MySQL database
The table statement of MySQL database is as follows
CREATE TABLE wordcount (word varchar CHARACTER SET utf8 NOT NULL, count int (10) NOT NULL, PRIMARY KEY (word)) ENGINE=InnoDB DEFAULT CHARSET=latin1
Add mysql-connector-java-5.1.40-bin.jar to IDEA
The code is as follows
In fact, at first, the code of connection pool thought of using static blocks to write a pool to get it directly, but if you take into account the problem of insufficient pool width, this way is actually better. At first, a connection pool is instantiated and is called to obtain a connection. When all the connections are obtained, the pool is empty, and another pool is instantiated.
Package StreamingDemoimport java.sql. {Connection, DriverManager, SQLException} import java.utilobject JDBCManager {var connectionQue: java.util.LinkedList [Connection] = null / * get connection objects from the database connection pool * @ return * / def getConnection (): Connection = {synchronized ({try {/ / if the connection pool is empty Then instantiate a linked list of Connection type if (connectionQue = = null) {connectionQue = new util.LinkedList [Connection] () for (I e.printStackTrace ()} / / if the connection pool is not empty, return the header element and delete it from the linked list return connectionQue.poll ()}) / * * when the connection object is used up You need to call this method to return the connection * @ param connection * / def returnConnection (connection: Connection) = {/ / insert element connectionQue.push (connection)} def main (args: Array [String]): Unit = {/ / main method test getConnection () println (connectionQue.size ())}}
Wordcount code
Package StreamingDemoimport org.apache.log4j. {Level, Logger} import org.apache.spark. {SparkConf, streaming} import org.apache.spark.streaming. {Seconds, StreamingContext} object ForeachRDDDemo {def main (args: Array [String]): Unit = {/ / set the log level to avoid users with too much INFO information Logger.getLogger ("org") .setLevel (Level.WARN) / / set Hadoop Do not add or System.setProperty ("HADOOP_USER_NAME", "Setsuna") / / Spark basic configuration val conf = new SparkConf () .setAppName (this.getClass.getSimpleName) .setMaster ("local [2]") val ssc = new StreamingContext (conf, streaming.Seconds (2)) / / because you want to use updateStateByKey, you need to use checkpoint ssc.checkpoint ("hdfs://Hadoop01:9000/checkpoint") / / set socket As configured by nc, val linesDStream = ssc.socketTextStream ("Hadoop01", 6666) val wordCountDStream = linesDStream .flatMap (_ .split ("")) / / do participle based on white.map ((_, 1)) / / generate (word,1) .updateStateByKey ((values: Seq [Int]) State: Option [Int]) = > {/ / Real-time update status information var count = state.getOrElse (0) for (value {if (! rdd.isEmpty ()) {rdd.foreachPartition (part = > {/ / get connection val connection = JDBCManager.getConnection () part.foreach (data = > {val sql = / / insert wordcount information into the wordcount table) If the on duplicate key update clause is updated, insert s "insert into wordcount (word,count)" + s "values ('${data._1}', ${data._2}) on duplicate key update count=$ {data._2}" / / use prepareStatement to use the SQL statement val pstmt = connection.prepareStatement (sql) pstmt.executeUpdate ()}) / / after submitting the data at the join Return the connection to the connection pool JDBCManager.returnConnection (connection)}) ssc.start () ssc.awaitTermination ()}}
Open nc and enter data
When querying the result of wordcount at another terminal, it can be found that it changes in real time.
# niming {width:99%;padding:0.5%;text-align:center; index.9999 width:99%;padding:0.5%;text-align:center; width:99%;padding:0.5%;text-align:center;: none;color:#000;} .tuiguangbiaoji {font-size:10px;margin:0px 10px;} Thank you for your reading! On the "Spark Streaming operator development example analysis" this article is shared here, I hope the above content can be of some help to you, so that you can learn more knowledge, if you think the article is good, you can share it out for more people to see it!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.