In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-22 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/02 Report--
This article shows you the SparkStreaming operator development example analysis, the content is concise and easy to understand, absolutely can make your eyes bright, through the detailed introduction of this article, I hope you can get something.
An example of Spark Streaming operator development
Transform operator development
When transform operation is applied on DStream, it can be used to perform any conversion operation from RDD to RDD, and it can also be used to achieve operations that are not provided in DStreamAPI. For example, DStreamAPI does not provide the operation of join each batch in a DStream with a specific RDD. The join operator in DStream can only join other DStream, but we can use transform operation to achieve this function.
Example: real-time filtering of blacklist users
Package StreamingDemoimport org.apache.log4j. {Level, Logger} import org.apache.spark.SparkConfimport org.apache.spark.streaming. {Seconds, StreamingContext}
/ * * Real-time blacklist filtering * / object TransformDemo {def main (args: Array [String]): Unit = {
/ / set log level Logger.getLogger ("org")
.setLevel (Level.WARN) val conf = new SparkConf ()
.setAppName (this.getClass.getSimpleName)
.setMaster ("local [2]") val ssc = new StreamingContext (conf, Seconds (2))
/ / create a blacklist of RDD val blackRDD = ssc.sparkContext.parallelize (Array (("zs", true), ("lisi", true)
/ / get data from nc through socket val linesDStream = ssc.socketTextStream ("Hadoop01", 6666)
/ * * filter blacklist user comments * zs sb sb sb sb * lisi fuck fuck fuck * jack hello * / linesDStream .map (x = > {val info = x.split (") (info (0), info.toList.tail.mkString (")}) .transform (rdd = > {
/ / transform is a RDD- > RDD operation, so the return value must be RDD
/ * after leftouterjoin operation, the result is as follows: * (zs, (sb sb sb sb), Some (true)) * (lisi, (fuck fuck fuck), some (true) * (jack, (hello,None)) * / val joinRDD = rdd.leftOuterJoin (blackRDD)
/ / if it is Some (true), it means blacklisted users. If it is None, it means it is not in the blacklist. Keep non-blacklisted users val filterRDD = joinRDD.filter (x = > x._2._2.isEmpty) filterRDD})
Print () ssc.start () ssc.awaitTermination ()}}
test
Start nc and pass in users and their speaking information
You can see that the program filters out the user comments on the blacklist in real time.
UpdateStateByKey operator development
The updateStateByKey operator can maintain an arbitrary state while constantly updating new information. This operator can maintain a state for each key and constantly update the state. For each batch, Spark applies the State update function to each pre-existing key. Regardless of whether the key has a new value in batch or not, if the value returned by the State update function is none, then the corresponding state of the key will be deleted, and the state update function will also be executed for the new key.
To use this operator, you must take two steps
Defining state--state can be any data type defining an state update function-- using a function to specify how to use the previous state, and to get new values from the input stream to update the state.
Note: updateStateByKey operation requires that the Checkpoint mechanism must be enabled
Example: real-time WordCount based on cache
Package StreamingDemoimport org.apache.log4j. {
Level, Logger} import org.apache.spark.SparkConfimport org.apache.spark.streaming. {Seconds, StreamingContext}
/ * Real-time WordCount based on cache, counting the number of word occurrences globally * / object UpdateStateByKeyDemo {def main (args: Array [String]): Unit = {
/ / set log level Logger.getLogger ("org") .setLevel (Level.WARN)
/ * if security authentication is not enabled or the user obtained from Kerberos is null, get the HADOOP_USER_NAME environment variable, * and use its value as Hadoop to execute the user setting hadoop username * here is an experiment, even if you do not explicitly add it without security authentication enabled. It will also automatically get my user name * / / System.setProperty ("HADOOP_USER_NAME", "Setsuna") val conf = new SparkConf () .setAppName (this.getClass.getSimpleName) .setMaster ("local [2]") val ssc = new StreamingContext (conf, Seconds (2))
/ / set the path where Checkpoint is stored, ssc.checkpoint ("hdfs://Hadoop01:9000/checkpoint")
/ / create input DStream val lineDStream = ssc.socketTextStream ("Hadoop01", 6666) val wordDStream = lineDStream.flatMap (_ .split (")) val pairsDStream = wordDStream.map ((_, 1))
/ * state: represents the previous status value * values: represents the value corresponding to key in the current batch * / val resultDStream = pairsDStream.updateStateByKey ((values: Seq [Int], state: Option [Int]) = > {
/ / when state is none, it means that the word is not counted, then the value of 0 is returned to the counter count var count = state.getOrElse (0).
/ / iterate through values, accumulating the value for of new words (value x + y)
/ / window length is 5 seconds Seconds (5)
/ / the window interval is 2 seconds Seconds (2)) .transform (rdd = > {
/ / transform operator processes rdd and converts it to another rdd
/ / sort according to the number of occurrences of Key, and then sort them in descending order to get the top three search words val info: Array [(String, Int)] = rdd.sortBy (_. _ 2, false) .take (3)
/ / convert Array to resultRDD val resultRDD = ssc.sparkContext.parallelize (info) resultRDD}) .map (x = > s "${x.room1} appears: ${x.room2}") .print () ssc.start () ssc.awaitTermination ()}}
Test result
Overview of DStream Output operation
Spark Streaming allows DStream data to be exported to external systems. All calculations in DSteram are triggered by output operations, and foreachRDD output operations must also be performed on RDD in order to trigger computing logic for each batch.
Conversion description
Print () prints out the first 10 elements of the data in DStream in Driver. It is mainly used for testing, or when no output operation is required, it is used to simply trigger job.
SaveAsTextFiles (prefix, [suffix] saves the contents of DStream as text files, where the files generated during each batch interval are named as prefix-TIME_IN_MS [.suffix].
SaveAsObjectFiles (prefix, [suffix]) serializes the content in DStream as an object and saves it in SequenceFile format. The files generated during each batch interval are named as prefix-TIME_IN_MS [.suffix].
SaveAsHadoopFiles (prefix, [suffix]) saves the contents of DStream as Hadoop files as text, where the files generated during each batch interval are named as prefix-TIME_IN_MS [.suffix].
The most basic output operation of foreachRDD (func) is to apply the func function to RDD in DStream, which outputs data to an external system, such as saving RDD to a file or a network database. It is important to note that the func function is executed in the Driver process running the streaming application.
ForeachRDD operator development
ForeachRDD is the most commonly used output operation, which can traverse and process each generated RDD in DStream, and then write the data in each RDD to external storage, such as files, databases, caches, etc., where action operations are usually performed on RDD, such as foreach
Use foreachRDD to operate the database
Usually a Connection, such as JDBC Connection, is created in foreachRDD, and then the data is written to external storage through Connection
Myth 1: create a Connection outside the foreach operation of RDD
Dstream.foreachRDD {rdd = > val connection=createNewConnection () rdd.foreach {record = > connection.send (record)}}
This approach is wrong, which causes the Connection object to be serialized and transferred to every task, but the Connection object does not support serialization, so it cannot be transferred
Myth 2: create a Connection inside the foreach operation of RDD
Dstream.foreachRDD {rdd = > rdd.foreach {record = > val connection = createNewConnection () connection.send (record) connection.close ()}
Although this approach is possible, it is inefficient because it results in the creation of a Connection object for every piece of data in RDD, and the creation of Connection objects usually consumes performance
In a reasonable way
The first: use RDD's foreachPartition operation, and create a Connection object inside the operation, which is equivalent to creating a Connection object for each partition of RDD, saving a lot of resources. Second, you manually encapsulate a static connection pool, use RDD's foreachPartition operation, and obtain a connection from the static connection pool internally through static methods, and then put it back into the connection pool after the connection is used. In this way, you can reuse connections between multiple partition of RDD
Example: real-time global statistics WordCount, and save the results to MySQL database
The table statement of MySQL database is as follows
CREATE TABLE wordcount (word varchar CHARACTER SET utf8 NOT NULL, count int (10) NOT NULL, PRIMARY KEY (word)) ENGINE=InnoDB DEFAULT CHARSET=latin1
Add mysql-connector-java-5.1.40-bin.jar to IDEA
The code is as follows
In fact, at first, the code of connection pool thought of using static blocks to write a pool to get it directly, but if you take into account the problem of insufficient pool width, this way is actually better. At first, a connection pool is instantiated and is called to obtain a connection. When all the connections are obtained, the pool is empty, and another pool is instantiated.
Package StreamingDemoimport java.sql. {Connection, DriverManager, SQLException} import java.utilobject JDBCManager {var connectionQue: java.util.LinkedList [Connection] = null
/ * get connection objects from the database connection pool * @ return * / def getConnection (): Connection = {synchronized ({try {
/ / if the connection pool is empty, instantiate a linked list of type Connection
If (connectionQue = = null) {
ConnectionQue = new util.LinkedList [Connection] ()
For (I e.printStackTrace ())}
/ / if the connection pool is not empty, return the header element and delete it from the linked list with return connectionQue.poll ()}}
/ * * when the connection object runs out, you need to call this method to return the connection * @ param connection * / def returnConnection (connection: Connection) = {
/ / insert element connectionQue.push (connection)} def main (args: Array [String]): Unit = {
/ / main method to test getConnection () println (connectionQue.size ())}}
Wordcount code
Package StreamingDemoimport org.apache.log4j. {Level, Logger} import org.apache.spark. {SparkConf, streaming} import org.apache.spark.streaming. {Seconds, StreamingContext} object ForeachRDDDemo {def main (args: Array [String]): Unit = {
/ / set the log level to avoid too much INFO information Logger.getLogger ("org") .setLevel (Level.WARN)
/ / users who set Hadoop can System.setProperty ("HADOOP_USER_NAME", "Setsuna") without adding it
/ / Spark basic configuration val conf = new SparkConf () .setAppName (this.getClass.getSimpleName) .setMaster ("local [2]") val ssc = new StreamingContext (conf, streaming.Seconds (2))
/ / because you are using updateStateByKey, you need to use checkpoint ssc.checkpoint ("hdfs://Hadoop01:9000/checkpoint")
/ / set socket as configured by nc val linesDStream = ssc.socketTextStream ("Hadoop01", 6666) val wordCountDStream = linesDStream .flatMap (_ .split (""))
/ / make a participle according to the blanks. Map ((_, 1))
/ / generate (word,1) .updateStateByKey ((values: Seq [Int], state: Option [Int]) = > {
/ / Real-time update status information var count = state.getOrElse (0) for (value {
If (! rdd.isEmpty ()) {rdd.foreachPartition (part = > {
/ / get the connection val connection = JDBCManager.getConnection () part.foreach from the connection pool (data = > {val sql = / / insert wordcount information into the wordcount table. If the on duplicate key update clause is updated, insert s "insert into wordcount (word,count)" + s "values ('${data._1}', ${data._2}) on duplicate key update count=$ {data._2}"
/ / use prepareStatement to use the SQL statement val pstmt = connection.prepareStatement (sql) pstmt.executeUpdate ()})
/ / after submitting the data at the connection, return the connection to the connection pool JDBCManager.returnConnection (connection)})}) ssc.start () ssc.awaitTermination ()}}
Open nc and enter data
When querying the result of wordcount at another terminal, it can be found that it changes in real time.
The above content is the case analysis of SparkStreaming operator development. Have you learned the knowledge or skills? If you want to learn more skills or enrich your knowledge reserve, you are welcome to follow the industry information channel.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.