In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-25 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/03 Report--
[TOC]
Various transformationTransformation Meaningmap (func) of DStream perform func function operations on each element in DStream and then return a new DStream.flatMap (func) similar to the map method, except that each input item can be output to zero or more output items filter (func) filter out all function func return DStream elements with the value of true and return a new DStreamrepartition (numPartitions) to increase or decrease the number of partitions in DStream Thus changing the parallelism of DStream union (otherStream) merges the source DStream with the elements whose input parameter is otherDStream, and returns a new DStream.count () by counting the elements in each RDD in DStreaim, and then returning DStreamreduce (func) composed of RDD with only one element to aggregate the elements in each RDD in the source DStream using func. Then return the new DStream.countByValue () consisting of the RDD of only one element. For the DStream of element type K, the corresponding value of the new DStream,Long in the form of a key-value pair is the number of key occurrences of each RDD in the source DStream. ReduceByKey (func, [numTasks]) aggregates the key in the source DStream using the func function, and then returns the new (K) V) for the DStream of the constituent DStreamjoin (otherStream, [numTasks]) input as (KMagol V), (KMagol W), return a new (K, (VMagne W) type DStreamcogroup (otherStream, [numTasks]) input as (KMague V), (KMagol W) type DStream. Return a new (K, Seq [V], Seq [W]) tuple type DStreamtransform (func) that acts on each RDD in the source code DStream through the RDD-to-RDD function, which can be any RDD operation, thus returning a new RDDupdateStateByKey (func) updates the key based on the pre-state of key and the new value of key, and returns a DstreamWindow function with a new state:
You can see that many of the transformation operators already exist in RDD, so here we only focus on transform, updateStateByKey, and window functions.
Transform Operation DStream transform of transformation
1. Transform operation, which can be used to perform any RDD-to-RDD conversion operation when applied on DStream. It can be used to implement operations that are not provided in DStream API. For example, DStream API does not provide the operation of join each batch in a DStream with a specific RDD. But we can use the transform operation to implement this function ourselves.
2. DStream.join (), which can only join other DStream. After the RDD of each batch in DStream is calculated, it will join with the RDD of other DStream.
Case
The test code is as follows:
Package cn.xpleaf.bigdata.spark.scala.streaming.p1import org.apache.log4j. {Level, Logger} import org.apache.spark.SparkConfimport org.apache.spark.rdd.RDDimport org.apache.spark.streaming. {Seconds, StreamingContext} import org.apache.spark.streaming.dstream. {DStream, ReceiverInputDStream} / * use Transformation's transform to complete online blacklist filtering * requirements: * send log data from ip ["27.19.74.143" "110.52.250.126"] Real-time filter out * data format * 27.19.74.143 17:38:20##GET / static/image/common/faq.gif HTTP/1.1##200##1127 * / object _ 06SparkStreamingTransformOps {def main (args: Array [String]): Unit = {if (args = = null | | args.length
< 2) { System.err.println( """Parameter Errors! Usage: |hostname: 监听的网络socket的主机名或ip地址 |port: 监听的网络socket的端口 """.stripMargin) System.exit(-1) } Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val conf = new SparkConf() .setAppName(_01SparkStreamingNetWorkOps.getClass.getSimpleName) .setMaster("local[2]") val ssc = new StreamingContext(conf, Seconds(2)) val hostname = args(0).trim val port = args(1).trim.toInt //黑名单数据 val blacklist = List(("27.19.74.143", true), ("110.52.250.126", true))// val blacklist = List("27.19.74.143", "110.52.250.126") val blacklistRDD:RDD[(String, Boolean)] = ssc.sparkContext.parallelize(blacklist) val linesDStream:ReceiverInputDStream[String] = ssc.socketTextStream(hostname, port) // 如果用到一个DStream和rdd进行操作,无法使用dstream直接操作,只能使用transform来进行操作 val filteredDStream:DStream[String] = linesDStream.transform(rdd =>{val ip2InfoRDD:RDD [(String, String)] = rdd.map {line = > {(line.split ("# #") (0), line)}} / * * A (M) B (N) two tables: * across join * Cross connection, connection without on condition Will produce Cartesian product (multiple records) can not use * inner join * equivalent join, take A table and B table intersection, that is, to obtain the data in An and B, no elimination can not use * left outer join * outer link: the most common is the left outer join (to retain all the data in the left table) The data in the right table can be displayed normally, but not in the right table. Display as null) * the result of inner join can be determined by non-empty left outer connection * / val joinedInfoRDD:RDD [(String, (String, option [Boolean]))] = ip2InfoRDD.leftOuterJoin (blacklistRDD) joinedInfoRDD.filter {case (ip, (line) Joined)) = > {joined = = None}} / / perform the filtering operation .map {case (ip, (line, joined)) = > line}}) filteredDStream.print () ssc.start () ssc.awaitTermination () ssc.stop () / boolean parameter in stop Set it to true, turn off the SparkContext corresponding to the ssc. Default is false, and only disable itself}}.
Generate data in nc:
[uplooking@uplooking01] $nc-lk 489327.19.74.143 / 2016-05-30 17:38:20##GET / data/attachment/common/c8/common_2_verify_icon.png HTTP/1.1##200##582110.52.250.126##2016-05-30 17:38:20##GET / static/js/logging.js?y7a HTTP/1.1##200##6038.35.201.144##2016-05-30 17:38: 20##GET / uc_server/avatar.php?uid=29331&size=middle HTTP/1.1##301##-
The output is as follows:
-Time: 1526006084000 ms---8.35.201.144##2016-05-30 17:38:20##GET / uc_server/avatar.php Overview of updateStateByKey operation of uid=29331&size=middle HTTP/1.1##301##-transformation
1. The updateStateByKey of Spark Streaming can do reduce operation by key according to the data in DStream, and then accumulate the data of each batch.
2. UpdateStateByKey interpretation
Do reduce operation by key with the data in DStream, and then accumulate the data of each batch. When new data information is entered or updated, users can keep any shape they want. Two steps are required to use this feature:
1) define the state: it can be any data type
2) define a state update function: use a function to specify how to use the previous state to update the state from the new value in the input stream. For stateful operations, it is necessary to constantly slice the current and historical time slices of the RDD accumulation calculation, with the loss of time, the scale of computing data will become larger and larger.
3. What you need to think about is that if the amount of data is very large, or if the performance requirements are extremely stringent, you can consider putting the data on Redis or tachyon or ignite
4. Note that the updateStateByKey operation requires that the Checkpoint mechanism be enabled.
Case Scala version
The test code is as follows:
Package cn.xpleaf.bigdata.spark.scala.streaming.p1import org.apache.log4j. {Level, Logger} import org.apache.spark.SparkConfimport org.apache.spark.streaming.dstream. {DStream, ReceiverInputDStream} import org.apache.spark.streaming. {Seconds, StreamingContext} / * the state function updateStateByKey * updates the status of key (that is, the value corresponding to key) * * Calculate the status of a key up to the current location * count the corresponding count of the word so far * in order to complete the operation so far, you must accumulate the historical data and the latest data. So you need a place to store historical data * this place is the checkpoint directory * * / object _ 07SparkStreamingUpdateStateByKeyOps {def main (args: Array [String]): Unit = {if (args = = null | | args.length
< 2) { System.err.println( """Parameter Errors! Usage: |hostname: 监听的网络socket的主机名或ip地址 |port: 监听的网络socket的端口 """.stripMargin) System.exit(-1) } val hostname = args(0).trim val port = args(1).trim.toInt Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val conf = new SparkConf() .setAppName(_07SparkStreamingUpdateStateByKeyOps.getClass.getSimpleName) .setMaster("local[2]") val ssc = new StreamingContext(conf, Seconds(2)) ssc.checkpoint("hdfs://ns1/checkpoint/streaming/usb") // 接收到的当前批次的数据 val linesDStream:ReceiverInputDStream[String] = ssc.socketTextStream(hostname, port) // 这是记录下来的当前批次的数据 val rbkDStream:DStream[(String, Int)] =linesDStream.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_) val usbDStream:DStream[(String, Int)] = rbkDStream.updateStateByKey(updateFunc) usbDStream.print() ssc.start() ssc.awaitTermination() ssc.stop() // stop中的boolean参数,设置为true,关闭该ssc对应的SparkContext,默认为false,只关闭自身 } /** * @param seq 当前批次的key对应的数据 * @param history 历史key对应的数据,可能有可能没有 * @return */ def updateFunc(seq: Seq[Int], history: Option[Int]): Option[Int] = { var sum = seq.sum if(history.isDefined) { sum += history.get } Option[Int](sum) }} nc产生数据: [uplooking@uplooking01 ~]$ nc -lk 4893hello hellohello you hello he hello me 输出结果如下: -------------------------------------------Time: 1526009358000 ms-------------------------------------------(hello,2)18/05/11 11:29:18 INFO WriteAheadLogManager for Thread: Attempting to clear 0 old log files in hdfs://ns1/checkpoint/streaming/usb/receivedBlockMetadata older than 1526009338000: -------------------------------------------Time: 1526009360000 ms-------------------------------------------(hello,5)(me,1)(you,1)(he,1)18/05/11 11:29:20 INFO WriteAheadLogManager for Thread: Attempting to clear 0 old log files in hdfs://ns1/checkpoint/streaming/usb/receivedBlockMetadata older than 1526009340000: -------------------------------------------Time: 1526009362000 ms-------------------------------------------(hello,5)(me,1)(you,1)(he,1)Java版 用法略有不同,主要是 状态更新函数的写法上有区别,如下: package cn.xpleaf.bigdata.spark.java.streaming.p1;import com.google.common.base.Optional;import org.apache.log4j.Level;import org.apache.log4j.Logger;import org.apache.spark.SparkConf;import org.apache.spark.api.java.function.FlatMapFunction;import org.apache.spark.api.java.function.Function2;import org.apache.spark.streaming.Durations;import org.apache.spark.streaming.api.java.JavaDStream;import org.apache.spark.streaming.api.java.JavaPairDStream;import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;import org.apache.spark.streaming.api.java.JavaStreamingContext;import scala.Tuple2;import java.util.Arrays;import java.util.List;public class _02SparkStreamingUpdateStateByKeyOps { public static void main(String[] args) { if(args == null || args.length < 2) { System.err.println("Parameter Errors! Usage: "); System.exit(-1); } Logger.getLogger("org.apache.spark").setLevel(Level.OFF); SparkConf conf = new SparkConf() .setAppName(_02SparkStreamingUpdateStateByKeyOps.class.getSimpleName()) .setMaster("local[2]"); JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(2)); jsc.checkpoint("hdfs://ns1/checkpoint/streaming/usb"); String hostname = args[0].trim(); int port = Integer.valueOf(args[1].trim()); JavaReceiverInputDStream lineDStream = jsc.socketTextStream(hostname, port);//默认的持久化级别:MEMORY_AND_DISK_SER_2 JavaDStream wordsDStream = lineDStream.flatMap(new FlatMapFunction() { @Override public Iterable call(String line) throws Exception { return Arrays.asList(line.split(" ")); } }); JavaPairDStream pairsDStream = wordsDStream.mapToPair(word ->{return new Tuple2 (word, 1);}); JavaPairDStream rbkDStream = pairsDStream.reduceByKey (new Function2 () {@ Override public Integer call (Integer v1, Integer v2) throws Exception {return v1 + v2;}}) / / cumulative operation JavaPairDStream usbDStream = rbkDStream.updateStateByKey (new Function2 () {@ Override public Optional call (List current, Optional history) throws Exception {int sum = 0; for (int I: current) {sum + = I) } if (history.isPresent ()) {sum + = history.get ();} return Optional.of (sum);}}); usbDStream.print (); jsc.start (); / / start streaming jsc.awaitTermination () / / wait for execution to end jsc.close ();}} window operation of transformation DStream window sliding window
Spark Streaming provides support for sliding window operations, which allows us to perform computational operations on the data in a sliding window. Each time the RDD data falls into the window, it is aggregated to perform the calculation operation, and then the generated RDD is used as a RDD of the window DStream. For example, in the following figure, a sliding window calculation is performed on the data every three seconds, and the three RDD in these three seconds will be aggregated for processing, and then two seconds later, the sliding window calculation will be performed on the data in the last three seconds. So for each sliding window operation, you must specify two parameters, the window length and the sliding interval, and both parameter values must be an integral multiple of the batch interval.
1. The red rectangle is a window, and the window hold is the data flow over a period of time.
two。 Each time is a unit of time. In the official example, every window size is 3 time unit, and every 2 units of time, the window will slide.
So for window-based operations, you need to specify two parameters:
Window length-The duration of the window (3 in the figure) slide interval-The interval at which the window-based operation is performed (2 in the figure). 1. Window size, personal feeling is a container of data for a period of time. two。 The sliding interval is the cron expression that we can understand. Let me give you an example: take the most famous example of wordcount, which counts the data over the past 30 seconds every 10 seconds. / / Reduce last 30 seconds of data, every 10 seconds val windowedWordCounts = pairs.reduceByKeyAndWindow (_ + _, Seconds (30), Seconds (10)) DSstream window sliding container function window performs custom calculations on the data of each sliding window countByWindow performs count operations on the data of each sliding window reduceByWindow performs reduce operations on the data of each sliding window reduceByKeyAndWindow performs reduceByKey operations on the data of each sliding window countByValueAndWindow performs countByValue operations on the data of each sliding window
The test code is as follows:
Package cn.xpleaf.bigdata.spark.scala.streaming.p1import org.apache.log4j. {Level, Logger} import org.apache.spark.SparkConfimport org.apache.spark.streaming.dstream. {DStream, ReceiverInputDStream} import org.apache.spark.streaming. {Seconds StreamingContext} / * window function window * how often (sliding frequency slideDuration) counts the data in the past (window length windowDuration) * what you need to pay attention to is window length and sliding frequency * windowDuration = M*batchInterval SlideDuration = N*batchInterval * / object _ 08SparkStreamingWindowOps {def main (args: Array [String]): Unit = {if (args = = null | | args.length
< 2) { System.err.println( """Parameter Errors! Usage: |hostname: 监听的网络socket的主机名或ip地址 |port: 监听的网络socket的端口 """.stripMargin) System.exit(-1) } val hostname = args(0).trim val port = args(1).trim.toInt Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val conf = new SparkConf() .setAppName(_08SparkStreamingWindowOps.getClass.getSimpleName) .setMaster("local[2]") val ssc = new StreamingContext(conf, Seconds(2)) // 接收到的当前批次的数据 val linesDStream:ReceiverInputDStream[String] = ssc.socketTextStream(hostname, port) val pairsDStream:DStream[(String, Int)] =linesDStream.flatMap(_.split(" ")).map((_, 1)) // 每隔4s,统计过去6s中产生的数据 val retDStream:DStream[(String, Int)] = pairsDStream.reduceByKeyAndWindow(_+_, windowDuration = Seconds(6), slideDuration = Seconds(4)) retDStream.print() ssc.start() ssc.awaitTermination() ssc.stop() // stop中的boolean参数,设置为true,关闭该ssc对应的SparkContext,默认为false,只关闭自身 }} nc产生数据: [uplooking@uplooking01 ~]$ nc -lk 4893hello youhello hehello mehello youhello he 输出结果如下: -------------------------------------------Time: 1526016316000 ms-------------------------------------------(hello,4)(me,1)(you,2)(he,1)-------------------------------------------Time: 1526016320000 ms-------------------------------------------(hello,5)(me,1)(you,2)(he,2)-------------------------------------------Time: 1526016324000 ms-------------------------------------------DStream的output操作以及foreachRDDDStream output操作 1、print 打印每个batch中的前10个元素,主要用于测试,或者是不需要执行什么output操作时,用于简单触发一下job。 2、saveAsTextFile(prefix, [suffix]) 将每个batch的数据保存到文件中。每个batch的文件的命名格式为:prefix-TIME_IN_MS[.suffix] 3、saveAsObjectFile 同上,但是将每个batch的数据以序列化对象的方式,保存到SequenceFile中。 4、saveAsHadoopFile 同上,将数据保存到Hadoop文件中 5、foreachRDD 最常用的output操作,遍历DStream中的每个产生的RDD,进行处理。可以将每个RDD中的数据写入外部存储,比如文件、数据库、缓存等。通常在其中,是针对RDD执行action操作的,比如foreach。 DStream foreachRDD详解 相关内容其实在Spark开发调优中已经有相关的说明。 通常在foreachRDD中,都会创建一个Connection,比如JDBC Connection,然后通过Connection将数据写入外部存储。 误区一:在RDD的foreach操作外部,创建Connection 这种方式是错误的,因为它会导致Connection对象被序列化后传输到每个Task中。而这种Connection对象,实际上一般是不支持序列化的,也就无法被传输。 dstream.foreachRDD { rdd =>Val connection = createNewConnection () rdd.foreach {record = > connection.send (record)} mistake 2: create a Connection inside the foreach operation of RDD
This approach is OK, but it is inefficient. Because it causes a Connection object to be created for each piece of data in the RDD. Generally speaking, the creation of Connection is very performance-consuming.
Dstream.foreachRDD {rdd = > rdd.foreach {record = > val connection = createNewConnection () connection.send (record) connection.close ()} rational use of DStream foreachRDD
One reasonable way: use RDD's foreachPartition operation, and within that operation, create a Connection object, which is equivalent to creating a Connection object for each partition of RDD, which saves a lot of resources.
Dstream.foreachRDD {rdd = > rdd.foreachPartition {partitionOfRecords = > val connection = createNewConnection () partitionOfRecords.foreach (record = > connection.send (record)) connection.close ()}
Reasonable way 2: manually encapsulate a static connection pool, use the foreachPartition operation of RDD, and within the operation, obtain a connection from the static connection pool through static method, and then return it after use. In this way, connections can be reused even between the partition of multiple RDD. And you can let the connection pool adopt the strategy of lazy creation, and release it after being idle for a period of time.
Dstream.foreachRDD {rdd = > rdd.foreachPartition {partitionOfRecords = > val connection = ConnectionPool.getConnection () partitionOfRecords.foreach (record = > connection.send (record)) ConnectionPool.returnConnection (connection)}} foreachRDD and foreachPartition implementation
It is important to note that:
(1) you'd better use the forEachPartition function to traverse the RDD and create a database connection on each Work.
(2) if the concurrency of your database is limited, you can reduce concurrency by controlling the partitioning of the data.
(3) it is best to use bulk insert when inserting MySQL.
(4) make sure that the database process you write can handle failure, because your process of inserting the database may pass through the network, which may cause the data to be inserted into the database to fail.
(5) it is not recommended to write your RDD data to relational databases such as MySQL.
This section can actually refer to the case of the development tuning section, but there is no foreachRDD because it does not use DStream, but the principle is the same, because in the end it is all for RDD.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.