Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Example Analysis of Spark Streaming operator Development

2025-02-25 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/03 Report--

This article is to share with you the content of a sample analysis of Spark Streaming operator development. The editor thinks it is very practical, so share it with you as a reference and follow the editor to have a look.

An example of Spark Streaming operator development

Transform operator development

When transform operation is applied on DStream, it can be used to perform any conversion operation from RDD to RDD, and it can also be used to achieve operations that are not provided in DStreamAPI. For example, DStreamAPI does not provide the operation of join each batch in a DStream with a specific RDD. The join operator in DStream can only join other DStream, but we can use transform operation to achieve this function.

Example: real-time filtering of blacklist users

Package StreamingDemoimport org.apache.log4j. {Level, Logger} import org.apache.spark.SparkConfimport org.apache.spark.streaming. {Seconds, StreamingContext} / * * Real-time blacklist filtering * / object TransformDemo {def main (args: Array [String]): Unit = {/ / set log level Logger.getLogger ("org") .setLevel (Level.WARN) val conf = new SparkConf () .setAppName (this.getClass.getSimpleName) .setMaster ("local [2]") val ssc = new StreamingContext (conf) Seconds (2)) / / create a blacklist of RDD val blackRDD = ssc.sparkContext.parallelize (Array (("zs", true), ("lisi", true)) / / get data val linesDStream = ssc.socketTextStream ("Hadoop01") from nc through socket 6666) / * filter blacklist user statements * zs sb sb sb sb * lisi fuck fuck fuck * jack hello * / linesDStream .map (x = > {val info = x.split (") (info (0), info.toList.tail.mkString (")}) .transform (rdd = > {/ / transform is an operation of RDD- > RDD Therefore, the return value must be RDD / * after leftouterjoin operation, the result is as follows: * (zs, (sb sb sb sb), Some (true)) * (lisi, (fuck fuck fuck), some (true) * (jack, (hello,None)) * / val joinRDD = rdd.leftOuterJoin (blackRDD) / / if it is Some (true), it means blacklisted users If it is None, if it is not on the blacklist, keep the non-blacklisted users val filterRDD = joinRDD.filter (x = > x._2._2.isEmpty) filterRDD}) .map (x = > (x.Zong1mcx.Max1). Print () ssc.start () ssc.awaitTermination ()}}

test

Start nc and pass in users and their speaking information

You can see that the program filters out the user comments on the blacklist in real time.

UpdateStateByKey operator development

The updateStateByKey operator can maintain an arbitrary state while constantly updating new information. This operator can maintain a state for each key and constantly update the state. For each batch, Spark applies the State update function to each pre-existing key. Regardless of whether the key has a new value in batch or not, if the value returned by the State update function is none, then the corresponding state of the key will be deleted, and the state update function will also be executed for the new key.

To use this operator, you must take two steps

Define that state--state can be any data type

Define a state update function-use a function to specify how to use the previous state, and to get new values from the input stream to update the state

Note: updateStateByKey operation requires that the Checkpoint mechanism must be enabled

Example: real-time WordCount based on cache

Package StreamingDemoimport org.apache.log4j. {Level, Logger} import org.apache.spark.SparkConfimport org.apache.spark.streaming. {Seconds, StreamingContext} / * cache-based real-time WordCount Count the number of word occurrences in the global scope * / object UpdateStateByKeyDemo {def main (args: Array [String]): Unit = {/ / set log level Logger.getLogger ("org") .setLevel (Level.WARN) / * if security authentication is not enabled or the user obtained from Kerberos is null, then get the HADOOP_USER_NAME environment variable * and use its value as Hadoop to execute the user setting hadoop username * here is an experiment without enabling security authentication, even if it is not explicitly added It will also automatically get my user name * / / System.setProperty ("HADOOP_USER_NAME", "Setsuna") val conf = new SparkConf () .setAppName (this.getClass.getSimpleName) .setMaster ("local [2]") val ssc = new StreamingContext (conf, Seconds (2)) / / set the path to Checkpoint storage ssc.checkpoint ("hdfs://Hadoop01:9000/checkpoint") / / create input DStream val lineDStream = ssc.socketTextStream ("Hadoop01") 6666) val wordDStream = lineDStream.flatMap (_ .split (") val pairsDStream = wordDStream.map ((_, 1)) / * state: represents the previous status value * values: represents the value corresponding to key in the current batch * / val resultDStream = pairsDStream.updateStateByKey ((values: Seq [Int], state: Option [Int]) = > {/ / when state is none If the word is not counted, a value of 0 is returned to the counter count var count = state.getOrElse (0) / / traversing values, and the value for of the new word is accumulated (value x + y, / / window length is 5 seconds Seconds (5), / / window interval is 2 seconds Seconds (2)) .transform (rdd = > {/ / transform operator processes rdd Convert to another rdd / / sort according to the number of occurrences of Key, and then sort them in descending order Get the top three search words val info: Array [(String, Int)] = rdd.sortBy (_. _ 2, false) .take (3) / convert Array to resultRDD val resultRDD = ssc.sparkContext.parallelize (info) resultRDD}) .map (x = > s "${x.room1} appears: ${x.room2}") .print () ssc.start () ssc.awaitTermination ()}}

Test result

Overview of DStream Output operation

Spark Streaming allows DStream data to be exported to external systems. All calculations in DSteram are triggered by output operations, and foreachRDD output operations must also be performed on RDD in order to trigger computing logic for each batch.

The transformation description print () prints out the first 10 elements of the data in DStream in Driver. It is mainly used for testing, or when no output operation is required, it is used to simply trigger job. SaveAsTextFiles (prefix

[suffix]) saves the contents of DStream as text files, where the files generated during each batch interval are named as prefix-TIME_IN_MS [.suffix]. SaveAsObjectFiles (prefix

, [suffix]) serializes the content in DStream as an object and saves it in SequenceFile format. The files generated during each batch interval are named as prefix-TIME_IN_MS [.suffix]. SaveAsHadoopFiles (pref

Ix, [suffix]) saves the contents of DStream as Hadoop files as text, where the files generated during each batch interval

Named after prefix-TIME_IN_MS [.suffix]. The most basic output operation of foreachRDD (func). Apply the func function to the RDD in DStream. This operation will output the data to the external system.

System, such as saving RDD to files or network databases, etc. It should be noted that the func function is running the streaming

Executed in the Driver process of the application.

ForeachRDD operator development

ForeachRDD is the most commonly used output operation, which can traverse and process each generated RDD in DStream, and then write the data in each RDD to external storage, such as files, databases, caches, etc., where action operations are usually performed on RDD, such as foreach

Use foreachRDD to operate the database

Usually a Connection, such as JDBC Connection, is created in foreachRDD, and then the data is written to external storage through Connection

Myth 1: create a Connection outside the foreach operation of RDD

Dstream.foreachRDD {rdd = > val connection=createNewConnection () rdd.foreach {record = > connection.send (record)}}

This approach is wrong, which causes the Connection object to be serialized and transferred to every task, but the Connection object does not support serialization, so it cannot be transferred

Myth 2: create a Connection inside the foreach operation of RDD

Dstream.foreachRDD {rdd = > rdd.foreach {record = > val connection = createNewConnection () connection.send (record) connection.close ()}

Although this approach is possible, it is inefficient because it results in the creation of a Connection object for every piece of data in RDD, and the creation of Connection objects usually consumes performance

In a reasonable way

The first: use the foreachPartition operation of RDD, and create a Connection object inside the operation, which is equivalent to creating a Connection object for each partition of RDD, saving a lot of resources

The second: manually encapsulate a static connection pool, use the foreachPartition operation of RDD, and obtain a connection from the static connection pool through the static method, and put it back into the connection pool after the connection is used. In this way, you can reuse connections between multiple partition of RDD

Example: real-time global statistics WordCount, and save the results to MySQL database

The table statement of MySQL database is as follows

CREATE TABLE wordcount (word varchar CHARACTER SET utf8 NOT NULL, count int (10) NOT NULL, PRIMARY KEY (word)) ENGINE=InnoDB DEFAULT CHARSET=latin1

Add mysql-connector-java-5.1.40-bin.jar to IDEA

The code is as follows

In fact, at first, the code of connection pool thought of using static blocks to write a pool to get it directly, but if you take into account the problem of insufficient pool width, this way is actually better. At first, a connection pool is instantiated and is called to obtain a connection. When all the connections are obtained, the pool is empty, and another pool is instantiated.

Package StreamingDemoimport java.sql. {Connection, DriverManager, SQLException} import java.utilobject JDBCManager {var connectionQue: java.util.LinkedList [Connection] = null / * get connection objects from the database connection pool * @ return * / def getConnection (): Connection = {synchronized ({try {/ / if the connection pool is empty Then instantiate a linked list of Connection type if (connectionQue = = null) {connectionQue = new util.LinkedList [Connection] () for (I e.printStackTrace ()} / / if the connection pool is not empty, return the header element and delete it from the linked list return connectionQue.poll ()}) / * * when the connection object is used up You need to call this method to return the connection * @ param connection * / def returnConnection (connection: Connection) = {/ / insert element connectionQue.push (connection)} def main (args: Array [String]): Unit = {/ / main method test getConnection () println (connectionQue.size ())}}

Wordcount code

Package StreamingDemoimport org.apache.log4j. {Level, Logger} import org.apache.spark. {SparkConf, streaming} import org.apache.spark.streaming. {Seconds, StreamingContext} object ForeachRDDDemo {def main (args: Array [String]): Unit = {/ / set the log level to avoid users with too much INFO information Logger.getLogger ("org") .setLevel (Level.WARN) / / set Hadoop Do not add or System.setProperty ("HADOOP_USER_NAME", "Setsuna") / / Spark basic configuration val conf = new SparkConf () .setAppName (this.getClass.getSimpleName) .setMaster ("local [2]") val ssc = new StreamingContext (conf, streaming.Seconds (2)) / / because you want to use updateStateByKey, you need to use checkpoint ssc.checkpoint ("hdfs://Hadoop01:9000/checkpoint") / / set socket As configured by nc, val linesDStream = ssc.socketTextStream ("Hadoop01", 6666) val wordCountDStream = linesDStream .flatMap (_ .split ("")) / / do participle based on white.map ((_, 1)) / / generate (word,1) .updateStateByKey ((values: Seq [Int]) State: Option [Int]) = > {/ / Real-time update status information var count = state.getOrElse (0) for (value {if (! rdd.isEmpty ()) {rdd.foreachPartition (part = > {/ / get connection val connection = JDBCManager.getConnection () part.foreach (data = > {val sql = / / insert wordcount information into the wordcount table) If the on duplicate key update clause is updated, insert s "insert into wordcount (word,count)" + s "values ('${data._1}', ${data._2}) on duplicate key update count=$ {data._2}" / / use prepareStatement to use the SQL statement val pstmt = connection.prepareStatement (sql) pstmt.executeUpdate ()}) / / after submitting the data at the join Return the connection to the connection pool JDBCManager.returnConnection (connection)}) ssc.start () ssc.awaitTermination ()}}

Open nc and enter data

When querying the result of wordcount at another terminal, it can be found that it changes in real time.

# niming {width:99%;padding:0.5%;text-align:center; index.9999 width:99%;padding:0.5%;text-align:center; width:99%;padding:0.5%;text-align:center;: none;color:#000;} .tuiguangbiaoji {font-size:10px;margin:0px 10px;} Thank you for your reading! On the "Spark Streaming operator development example analysis" this article is shared here, I hope the above content can be of some help to you, so that you can learn more knowledge, if you think the article is good, you can share it out for more people to see it!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report