Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Case Analysis of SparkStreaming operator Development

2025-02-22 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/02 Report--

This article shows you the SparkStreaming operator development example analysis, the content is concise and easy to understand, absolutely can make your eyes bright, through the detailed introduction of this article, I hope you can get something.

An example of Spark Streaming operator development

Transform operator development

When transform operation is applied on DStream, it can be used to perform any conversion operation from RDD to RDD, and it can also be used to achieve operations that are not provided in DStreamAPI. For example, DStreamAPI does not provide the operation of join each batch in a DStream with a specific RDD. The join operator in DStream can only join other DStream, but we can use transform operation to achieve this function.

Example: real-time filtering of blacklist users

Package StreamingDemoimport org.apache.log4j. {Level, Logger} import org.apache.spark.SparkConfimport org.apache.spark.streaming. {Seconds, StreamingContext}

/ * * Real-time blacklist filtering * / object TransformDemo {def main (args: Array [String]): Unit = {

/ / set log level Logger.getLogger ("org")

.setLevel (Level.WARN) val conf = new SparkConf ()

.setAppName (this.getClass.getSimpleName)

.setMaster ("local [2]") val ssc = new StreamingContext (conf, Seconds (2))

/ / create a blacklist of RDD val blackRDD = ssc.sparkContext.parallelize (Array (("zs", true), ("lisi", true)

/ / get data from nc through socket val linesDStream = ssc.socketTextStream ("Hadoop01", 6666)

/ * * filter blacklist user comments * zs sb sb sb sb * lisi fuck fuck fuck * jack hello * / linesDStream .map (x = > {val info = x.split (") (info (0), info.toList.tail.mkString (")}) .transform (rdd = > {

/ / transform is a RDD- > RDD operation, so the return value must be RDD

/ * after leftouterjoin operation, the result is as follows: * (zs, (sb sb sb sb), Some (true)) * (lisi, (fuck fuck fuck), some (true) * (jack, (hello,None)) * / val joinRDD = rdd.leftOuterJoin (blackRDD)

/ / if it is Some (true), it means blacklisted users. If it is None, it means it is not in the blacklist. Keep non-blacklisted users val filterRDD = joinRDD.filter (x = > x._2._2.isEmpty) filterRDD})

Print () ssc.start () ssc.awaitTermination ()}}

test

Start nc and pass in users and their speaking information

You can see that the program filters out the user comments on the blacklist in real time.

UpdateStateByKey operator development

The updateStateByKey operator can maintain an arbitrary state while constantly updating new information. This operator can maintain a state for each key and constantly update the state. For each batch, Spark applies the State update function to each pre-existing key. Regardless of whether the key has a new value in batch or not, if the value returned by the State update function is none, then the corresponding state of the key will be deleted, and the state update function will also be executed for the new key.

To use this operator, you must take two steps

Defining state--state can be any data type defining an state update function-- using a function to specify how to use the previous state, and to get new values from the input stream to update the state.

Note: updateStateByKey operation requires that the Checkpoint mechanism must be enabled

Example: real-time WordCount based on cache

Package StreamingDemoimport org.apache.log4j. {

Level, Logger} import org.apache.spark.SparkConfimport org.apache.spark.streaming. {Seconds, StreamingContext}

/ * Real-time WordCount based on cache, counting the number of word occurrences globally * / object UpdateStateByKeyDemo {def main (args: Array [String]): Unit = {

/ / set log level Logger.getLogger ("org") .setLevel (Level.WARN)

/ * if security authentication is not enabled or the user obtained from Kerberos is null, get the HADOOP_USER_NAME environment variable, * and use its value as Hadoop to execute the user setting hadoop username * here is an experiment, even if you do not explicitly add it without security authentication enabled. It will also automatically get my user name * / / System.setProperty ("HADOOP_USER_NAME", "Setsuna") val conf = new SparkConf () .setAppName (this.getClass.getSimpleName) .setMaster ("local [2]") val ssc = new StreamingContext (conf, Seconds (2))

/ / set the path where Checkpoint is stored, ssc.checkpoint ("hdfs://Hadoop01:9000/checkpoint")

/ / create input DStream val lineDStream = ssc.socketTextStream ("Hadoop01", 6666) val wordDStream = lineDStream.flatMap (_ .split (")) val pairsDStream = wordDStream.map ((_, 1))

/ * state: represents the previous status value * values: represents the value corresponding to key in the current batch * / val resultDStream = pairsDStream.updateStateByKey ((values: Seq [Int], state: Option [Int]) = > {

/ / when state is none, it means that the word is not counted, then the value of 0 is returned to the counter count var count = state.getOrElse (0).

/ / iterate through values, accumulating the value for of new words (value x + y)

/ / window length is 5 seconds Seconds (5)

/ / the window interval is 2 seconds Seconds (2)) .transform (rdd = > {

/ / transform operator processes rdd and converts it to another rdd

/ / sort according to the number of occurrences of Key, and then sort them in descending order to get the top three search words val info: Array [(String, Int)] = rdd.sortBy (_. _ 2, false) .take (3)

/ / convert Array to resultRDD val resultRDD = ssc.sparkContext.parallelize (info) resultRDD}) .map (x = > s "${x.room1} appears: ${x.room2}") .print () ssc.start () ssc.awaitTermination ()}}

Test result

Overview of DStream Output operation

Spark Streaming allows DStream data to be exported to external systems. All calculations in DSteram are triggered by output operations, and foreachRDD output operations must also be performed on RDD in order to trigger computing logic for each batch.

Conversion description

Print () prints out the first 10 elements of the data in DStream in Driver. It is mainly used for testing, or when no output operation is required, it is used to simply trigger job.

SaveAsTextFiles (prefix, [suffix] saves the contents of DStream as text files, where the files generated during each batch interval are named as prefix-TIME_IN_MS [.suffix].

SaveAsObjectFiles (prefix, [suffix]) serializes the content in DStream as an object and saves it in SequenceFile format. The files generated during each batch interval are named as prefix-TIME_IN_MS [.suffix].

SaveAsHadoopFiles (prefix, [suffix]) saves the contents of DStream as Hadoop files as text, where the files generated during each batch interval are named as prefix-TIME_IN_MS [.suffix].

The most basic output operation of foreachRDD (func) is to apply the func function to RDD in DStream, which outputs data to an external system, such as saving RDD to a file or a network database. It is important to note that the func function is executed in the Driver process running the streaming application.

ForeachRDD operator development

ForeachRDD is the most commonly used output operation, which can traverse and process each generated RDD in DStream, and then write the data in each RDD to external storage, such as files, databases, caches, etc., where action operations are usually performed on RDD, such as foreach

Use foreachRDD to operate the database

Usually a Connection, such as JDBC Connection, is created in foreachRDD, and then the data is written to external storage through Connection

Myth 1: create a Connection outside the foreach operation of RDD

Dstream.foreachRDD {rdd = > val connection=createNewConnection () rdd.foreach {record = > connection.send (record)}}

This approach is wrong, which causes the Connection object to be serialized and transferred to every task, but the Connection object does not support serialization, so it cannot be transferred

Myth 2: create a Connection inside the foreach operation of RDD

Dstream.foreachRDD {rdd = > rdd.foreach {record = > val connection = createNewConnection () connection.send (record) connection.close ()}

Although this approach is possible, it is inefficient because it results in the creation of a Connection object for every piece of data in RDD, and the creation of Connection objects usually consumes performance

In a reasonable way

The first: use RDD's foreachPartition operation, and create a Connection object inside the operation, which is equivalent to creating a Connection object for each partition of RDD, saving a lot of resources. Second, you manually encapsulate a static connection pool, use RDD's foreachPartition operation, and obtain a connection from the static connection pool internally through static methods, and then put it back into the connection pool after the connection is used. In this way, you can reuse connections between multiple partition of RDD

Example: real-time global statistics WordCount, and save the results to MySQL database

The table statement of MySQL database is as follows

CREATE TABLE wordcount (word varchar CHARACTER SET utf8 NOT NULL, count int (10) NOT NULL, PRIMARY KEY (word)) ENGINE=InnoDB DEFAULT CHARSET=latin1

Add mysql-connector-java-5.1.40-bin.jar to IDEA

The code is as follows

In fact, at first, the code of connection pool thought of using static blocks to write a pool to get it directly, but if you take into account the problem of insufficient pool width, this way is actually better. At first, a connection pool is instantiated and is called to obtain a connection. When all the connections are obtained, the pool is empty, and another pool is instantiated.

Package StreamingDemoimport java.sql. {Connection, DriverManager, SQLException} import java.utilobject JDBCManager {var connectionQue: java.util.LinkedList [Connection] = null

/ * get connection objects from the database connection pool * @ return * / def getConnection (): Connection = {synchronized ({try {

/ / if the connection pool is empty, instantiate a linked list of type Connection

If (connectionQue = = null) {

ConnectionQue = new util.LinkedList [Connection] ()

For (I e.printStackTrace ())}

/ / if the connection pool is not empty, return the header element and delete it from the linked list with return connectionQue.poll ()}}

/ * * when the connection object runs out, you need to call this method to return the connection * @ param connection * / def returnConnection (connection: Connection) = {

/ / insert element connectionQue.push (connection)} def main (args: Array [String]): Unit = {

/ / main method to test getConnection () println (connectionQue.size ())}}

Wordcount code

Package StreamingDemoimport org.apache.log4j. {Level, Logger} import org.apache.spark. {SparkConf, streaming} import org.apache.spark.streaming. {Seconds, StreamingContext} object ForeachRDDDemo {def main (args: Array [String]): Unit = {

/ / set the log level to avoid too much INFO information Logger.getLogger ("org") .setLevel (Level.WARN)

/ / users who set Hadoop can System.setProperty ("HADOOP_USER_NAME", "Setsuna") without adding it

/ / Spark basic configuration val conf = new SparkConf () .setAppName (this.getClass.getSimpleName) .setMaster ("local [2]") val ssc = new StreamingContext (conf, streaming.Seconds (2))

/ / because you are using updateStateByKey, you need to use checkpoint ssc.checkpoint ("hdfs://Hadoop01:9000/checkpoint")

/ / set socket as configured by nc val linesDStream = ssc.socketTextStream ("Hadoop01", 6666) val wordCountDStream = linesDStream .flatMap (_ .split (""))

/ / make a participle according to the blanks. Map ((_, 1))

/ / generate (word,1) .updateStateByKey ((values: Seq [Int], state: Option [Int]) = > {

/ / Real-time update status information var count = state.getOrElse (0) for (value {

If (! rdd.isEmpty ()) {rdd.foreachPartition (part = > {

/ / get the connection val connection = JDBCManager.getConnection () part.foreach from the connection pool (data = > {val sql = / / insert wordcount information into the wordcount table. If the on duplicate key update clause is updated, insert s "insert into wordcount (word,count)" + s "values ('${data._1}', ${data._2}) on duplicate key update count=$ {data._2}"

/ / use prepareStatement to use the SQL statement val pstmt = connection.prepareStatement (sql) pstmt.executeUpdate ()})

/ / after submitting the data at the connection, return the connection to the connection pool JDBCManager.returnConnection (connection)})}) ssc.start () ssc.awaitTermination ()}}

Open nc and enter data

When querying the result of wordcount at another terminal, it can be found that it changes in real time.

The above content is the case analysis of SparkStreaming operator development. Have you learned the knowledge or skills? If you want to learn more skills or enrich your knowledge reserve, you are welcome to follow the industry information channel.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report